Flink 实现超速监控:从 Kafka 读取卡口数据写入 MySQL

发布于:2024-11-28 ⋅ 阅读:(12) ⋅ 点赞:(0)

目录

1. 背景

2. 实现需求

2.1 数据格式

2.2 超速判断规则

3. 实现步骤

3.1 创建 Kafka Topic

3.2 准备数据发送工具

3.3 Flink 实现代码

4. 代码说明

5. 项目运行验证

6. 总结


1. 背景

在智慧交通项目中,监控车辆是否超速是一个常见的需求。通过 Flink 处理流数据,可以实时监控车辆通过卡口时的速度,并将超速车辆信息写入数据库供后续分析。

本文将展示如何从 Kafka 的 topic-car 中读取车辆卡口数据,筛选出超速车辆,并将其信息写入 MySQL 数据库。


2. 实现需求

2.1 数据格式

  • Kafka 数据格式(JSON 示例):
{"action_time": 1682219447, "monitor_id": "0001", "camera_id": "1", "car": "豫A12345", "speed": 34.5, "road_id": "01", "area_id": "20"}
  • MySQL 表结构:
CREATE TABLE t_speeding_info (
    id INT AUTO_INCREMENT PRIMARY KEY,
    car VARCHAR(255) NOT NULL,
    monitor_id VARCHAR(255),
    road_id VARCHAR(255),
    real_speed DOUBLE,
    limit_speed INT,
    action_time BIGINT
);
DROP TABLE IF EXISTS `t_monitor_info`;
CREATE TABLE `t_monitor_info` (
  `monitor_id` varchar(255) NOT NULL,  
  `road_id` varchar(255) NOT NULL,
  `speed_limit` int(11) DEFAULT NULL,
  `area_id` varchar(255) DEFAULT NULL,
   PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--导入数据
INSERT INTO `t_monitor_info` VALUES ('0000', '02', 60, '01');
INSERT INTO `t_monitor_info` VALUES ('0001', '02', 60, '02');
INSERT INTO `t_monitor_info` VALUES ('0002', '03', 80, '01');
INSERT INTO `t_monitor_info` VALUES ('0004', '05', 100, '03');
INSERT INTO `t_monitor_info` VALUES ('0005', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0021', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0023', '05', 0, NULL);

2.2 超速判断规则

当车辆通过卡口时的 车速(speed)超过限速(limitSpeed)的 1.2 倍,即视为超速,将数据写入 t_speeding_info 表。


3. 实现步骤

3.1 创建 Kafka Topic

在 Kafka 中创建一个名为 topic-car 的主题:

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic-car

3.2 准备数据发送工具

通过 Kafka Producer 向 topic-car 发送数据:

kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic-car

示例数据:

{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":34.5,"road_id":"01","area_id":"20"}

3.3 Flink 实现代码

3.3.1 定义数据模型
package com.bigdata.day05;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CarInfo {
    private long actionTime;
    private String monitorId;
    private String cameraId;
    private String car;
    private double speed;
    private String roadId;
    private String areaId;

    // 限速字段,用于超速判断
    private double limitSpeed;
}
3.3.2 Flink 处理逻辑
package com.bigdata.windows;

import com.bigdata.day05.CarInfo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import com.alibaba.fastjson.JSON;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

public class SpeedingMonitor {
    public static void main(String[] args) throws Exception {
        // 1. 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 配置 Kafka 消费者
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092");
        kafkaProps.setProperty("group.id", "car-monitor-group");
        
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "topic-car",
                new SimpleStringSchema(),
                kafkaProps
        );

        // 3. 从 Kafka 中读取数据流
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

        // 4. 数据处理:解析 JSON,判断超速
        DataStream<CarInfo> filteredStream = kafkaStream
                .map(line -> {
                    // 将 JSON 数据转换为 CarInfo 对象
                    CarInfo carInfo = JSON.parseObject(line, CarInfo.class);
                    carInfo.setLimitSpeed(120); // 设置卡口限速(假定限速为 120)
                    return carInfo;
                })
                .filter(carInfo -> carInfo.getSpeed() > carInfo.getLimitSpeed() * 1.2); // 超速判断

        // 打印超速车辆信息
        filteredStream.print();

        // 5. 数据写入 MySQL
        filteredStream.addSink(JdbcSink.sink(
                "INSERT INTO t_speeding_info (car, monitor_id, road_id, real_speed, limit_speed, action_time) VALUES (?, ?, ?, ?, ?, ?)",
                (PreparedStatement ps, CarInfo carInfo) -> {
                    ps.setString(1, carInfo.getCar());
                    ps.setString(2, carInfo.getMonitorId());
                    ps.setString(3, carInfo.getRoadId());
                    ps.setDouble(4, carInfo.getSpeed());
                    ps.setInt(5, (int) carInfo.getLimitSpeed());
                    ps.setLong(6, carInfo.getActionTime());
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1) // 每次写入一条数据
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://localhost:3306/smart_traffic")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()
        ));

        // 6. 启动任务
        env.execute("Speeding Monitor");
    }
}

4. 代码说明

  1. Kafka 数据流处理

    • 使用 FlinkKafkaConsumer 从 Kafka 读取实时流数据。
    • 通过 FastJSON 将 JSON 数据解析为 Java 对象。
  2. 超速判断逻辑

    • 使用 .filter() 对流数据进行过滤,筛选超速车辆。
  3. MySQL Sink

    • 使用 Flink 的 JdbcSink 将超速数据写入 MySQL 表 t_speeding_info

5. 项目运行验证

5.1 启动 Flink 程序

在 IDEA 中运行 SpeedingMonitor 程序,确保 MySQL 服务正常运行。

5.2 发送测试数据

通过 Kafka Producer 向 topic-car 发送数据:

{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":145.0,"road_id":"01","area_id":"20"}

5.3 验证 MySQL 数据

查询 MySQL 表 t_speeding_info

SELECT * FROM t_speeding_info;

结果示例:

id car monitor_id road_id real_speed limit_speed action_time
1 豫A12345 0001 01 145.0 120 1682219447

6. 总结

通过本文,完整实现了从 Kafka 读取车辆卡口数据,筛选出超速车辆并写入 MySQL 的流程。使用 Flink 和 Kafka 的实时处理能力,可以轻松构建高效的智慧交通系统。

如果本文对你有帮助,请点赞、收藏,并分享给更多人! 😊