物联网实战:多语言(Java、Go、Rust、C++、C#、Rust)设备接入与数据处理

发布于:2025-07-03 ⋅ 阅读:(18) ⋅ 点赞:(0)

Spring Boot 物联网设备接入与数据处理实例

物联网(IoT)设备接入与数据处理是Spring Boot的常见应用场景之一。以下是一个完整的实例,涵盖设备接入、数据传输、数据处理和存储等关键环节。

设备接入

物联网设备通常通过MQTT、HTTP或WebSocket等协议接入系统。MQTT是物联网领域最常用的轻量级协议。

// MQTT配置类
@Configuration
public class MqttConfig {
    @Value("${mqtt.broker}")
    private String broker;
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] {broker});
        factory.setConnectionOptions(options);
        return factory;
    }
    
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
            new MqttPahoMessageDrivenChannelAdapter("serverIn", mqttClientFactory(), "topic1");
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
}
数据处理

设备数据通常以JSON格式传输,需要反序列化为Java对象进行处理。

@Service
public class DeviceDataService {
    @Autowired
    private DeviceDataRepository repository;
    
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public void handleMessage(Message<?> message) {
        String payload = message.getPayload().toString();
        DeviceData data = parseData(payload);
        processData(data);
        repository.save(data);
    }
    
    private DeviceData parseData(String json) {
        ObjectMapper mapper = new ObjectMapper();
        try {
            return mapper.readValue(json, DeviceData.class);
        } catch (IOException e) {
            throw new RuntimeException("数据解析失败", e);
        }
    }
    
    private void processData(DeviceData data) {
        // 数据校验、业务逻辑处理
        if(data.getTemperature() > 100) {
            alertService.sendAlert("高温警报", data.getDeviceId());
        }
    }
}
数据存储

使用Spring Data JPA持久化设备数据。

@Entity
public class DeviceData {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String deviceId;
    private double temperature;
    private double humidity;
    private LocalDateTime timestamp;
    // getters and setters
}

public interface DeviceDataRepository extends JpaRepository<DeviceData, Long> {
    List<DeviceData> findByDeviceId(String deviceId);
}
API接口

提供REST API供前端或其他系统查询设备数据。

@RestController
@RequestMapping("/api/devices")
public class DeviceController {
    @Autowired
    private DeviceDataService dataService;
    
    @GetMapping("/{deviceId}/data")
    public ResponseEntity<List<DeviceData>> getDeviceData(
            @PathVariable String deviceId,
            @RequestParam(required = false) LocalDateTime start,
            @RequestParam(required = false) LocalDateTime end) {
        List<DeviceData> data = dataService.getDataByDeviceAndTime(deviceId, start, end);
        return ResponseEntity.ok(data);
    }
}
实时监控

使用WebSocket实现设备数据的实时监控。

@Controller
public class WebSocketController {
    @Autowired
    private SimpMessagingTemplate template;
    
    @Scheduled(fixedRate = 5000)
    public void sendDeviceUpdates() {
        List<DeviceStatus> statuses = deviceService.getAllDeviceStatuses();
        template.convertAndSend("/topic/status", statuses);
    }
}
安全配置

物联网系统需要严格的安全控制。

@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
    @Override
    protected void configure(HttpSecurity http) throws Exception {
        http
            .authorizeRequests()
                .antMatchers("/api/public/**").permitAll()
                .antMatchers("/api/devices/**").hasRole("ADMIN")
                .anyRequest().authenticated()
            .and()
            .httpBasic();
    }
}
应用配置
# application.properties
mqtt.broker=tcp://iot.eclipse.org:1883
spring.datasource.url=jdbc:mysql://localhost:3306/iot_db
spring.datasource.username=root
spring.datasource.password=password
spring.jpa.hibernate.ddl-auto=update

以上实例展示了Spring Boot在物联网设备接入与数据处理中的典型应用,包括设备连接、数据处理、存储和展示等关键环节。实际项目中可能需要根据具体需求进行调整和扩展。

物联网设备接入与数据处理实例(Go语言实现)

以下是一个基于Go语言的物联网设备接入与数据处理的完整实例,包含设备接入、数据解析、存储及简单分析的核心流程。

设备接入(MQTT协议)

使用Eclipse Paho库实现MQTT客户端,接收设备上报的数据:

package main

import (
    "fmt"
    mqtt "github.com/eclipse/paho.mqtt.golang"
    "os"
    "os/signal"
    "syscall"
)

var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Received: %s from topic: %s\n", msg.Payload(), msg.Topic())
    // 调用数据处理函数
    processDeviceData(msg.Payload())
}

func main() {
    opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883")
    opts.SetClientID("go_iot_server")
    opts.SetDefaultPublishHandler(messageHandler)

    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    if token := client.Subscribe("iot/device/data", 1, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

    // 保持连接
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
    <-sigChan

    client.Unsubscribe("iot/device/data")
    client.Disconnect(250)
}

数据处理(JSON解析)

假设设备上报的数据为JSON格式:

type DeviceData struct {
    DeviceID  string  `json:"device_id"`
    Timestamp int64   `json:"timestamp"`
    Temp      float32 `json:"temperature"`
    Humidity  float32 `json:"humidity"`
}

func processDeviceData(raw []byte) {
    var data DeviceData
    if err := json.Unmarshal(raw, &data); err != nil {
        fmt.Println("JSON decode error:", err)
        return
    }

    // 数据校验
    if data.Temp > 100 || data.Humidity > 100 {
        fmt.Println("Invalid sensor data")
        return
    }

    // 存入数据库
    saveToDB(data)
}
数据存储(SQLite)

使用go-sqlite3进行本地存储:

import _ "github.com/mattn/go-sqlite3"

func saveToDB(data DeviceData) {
    db, err := sql.Open("sqlite3", "./iot.db")
    if err != nil {
        fmt.Println("DB open error:", err)
        return
    }
    defer db.Close()

    stmt, _ := db.Prepare(`INSERT INTO device_data 
        (device_id, timestamp, temperature, humidity) VALUES (?, ?, ?, ?)`)
    _, err = stmt.Exec(data.DeviceID, data.Timestamp, data.Temp, data.Humidity)
    if err != nil {
        fmt.Println("DB insert error:", err)
    }
}


数据分析(简单聚合)

计算指定设备的平均温度:

func getAvgTemperature(deviceID string) float32 {
    db, _ := sql.Open("sqlite3", "./iot.db")
    defer db.Close()

    var avg float32
    row := db.QueryRow(`SELECT AVG(temperature) 
        FROM device_data WHERE device_id = ?`, deviceID)
    row.Scan(&avg)
    return avg
}


扩展建议
  1. 协议扩展:支持CoAP、HTTP等协议接入
  2. 数据管道:结合Kafka或RabbitMQ实现高吞吐量处理
  3. 边缘计算:在网关层使用Go实现数据预处理
  4. 可视化:集成Grafana通过API展示数据

关键依赖
github.com/eclipse/paho.mqtt.golang  # MQTT客户端
github.com/mattn/go-sqlite3          # SQLite驱动
encoding/json                        # JSON处理

该实例展示了从设备接入到数据处理的完整链路,实际部署时需要根据具体场景调整错误处理、安全认证和性能优化策略。

物联网设备接入与数据处理实例(Python实现)

物联网设备接入与数据处理通常涉及设备连接、数据采集、传输、存储和分析等环节。以下是一个基于Python的完整实现示例,涵盖MQTT协议通信、数据存储和简单分析。

MQTT协议实现设备接入

安装MQTT客户端库:

pip install paho-mqtt

模拟设备发布数据:

import paho.mqtt.client as mqtt
import json
import time

broker = "mqtt.eclipseprojects.io"  # 公共测试服务器
topic = "iot/sensor/temperature"

def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")

client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker, 1883, 60)

while True:
    payload = {
        "device_id": "sensor_001",
        "value": round(25 + (5 * (0.5 - random.random())), 2),
        "timestamp": int(time.time())
    }
    client.publish(topic, json.dumps(payload))
    time.sleep(5)

服务器端订阅数据:

def on_message(client, userdata, msg):
    data = json.loads(msg.payload.decode())
    print(f"Received: {data}")

client = mqtt.Client()
client.connect(broker, 1883, 60)
client.subscribe("iot/sensor/#")
client.on_message = on_message
client.loop_forever()

数据存储方案

使用SQLite进行本地存储:

import sqlite3

def init_db():
    conn = sqlite3.connect('iot_data.db')
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS sensor_data
                 (id INTEGER PRIMARY KEY AUTOINCREMENT,
                  device_id TEXT,
                  value REAL,
                  timestamp INTEGER)''')
    conn.commit()
    conn.close()

def save_data(data):
    conn = sqlite3.connect('iot_data.db')
    c = conn.cursor()
    c.execute("INSERT INTO sensor_data (device_id, value, timestamp) VALUES (?, ?, ?)",
              (data['device_id'], data['value'], data['timestamp']))
    conn.commit()
    conn.close()

数据分析处理

使用Pandas进行数据分析:

import pandas as pd
from datetime import datetime

def analyze_data():
    conn = sqlite3.connect('iot_data.db')
    df = pd.read_sql_query("SELECT * FROM sensor_data", conn)
    conn.close()
    
    df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')
    df.set_index('datetime', inplace=True)
    
    # 按小时聚合平均值
    hourly_avg = df.resample('H')['value'].mean()
    print(hourly_avg)
    
    # 异常值检测
    mean = df['value'].mean()
    std = df['value'].std()
    anomalies = df[(df['value'] > mean + 2*std) | (df['value'] < mean - 2*std)]
    print(f"Detected {len(anomalies)} anomalies")

可视化展示

使用Matplotlib进行数据可视化:

import matplotlib.pyplot as plt

def plot_data():
    conn = sqlite3.connect('iot_data.db')
    df = pd.read_sql_query("SELECT * FROM sensor_data", conn)
    conn.close()
    
    plt.figure(figsize=(12, 6))
    plt.plot(pd.to_datetime(df['timestamp'], unit='s'), df['value'])
    plt.title("Sensor Data Trend")
    plt.xlabel("Time")
    plt.ylabel("Temperature")
    plt.grid()
    plt.show()

完整数据处理流程整合
class Io

网站公告

今日签到

点亮在社区的每一天
去签到