Spring Boot 物联网设备接入与数据处理实例
物联网(IoT)设备接入与数据处理是Spring Boot的常见应用场景之一。以下是一个完整的实例,涵盖设备接入、数据传输、数据处理和存储等关键环节。
设备接入
物联网设备通常通过MQTT、HTTP或WebSocket等协议接入系统。MQTT是物联网领域最常用的轻量级协议。
// MQTT配置类
@Configuration
public class MqttConfig {
@Value("${mqtt.broker}")
private String broker;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] {broker});
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("serverIn", mqttClientFactory(), "topic1");
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
}
数据处理
设备数据通常以JSON格式传输,需要反序列化为Java对象进行处理。
@Service
public class DeviceDataService {
@Autowired
private DeviceDataRepository repository;
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(Message<?> message) {
String payload = message.getPayload().toString();
DeviceData data = parseData(payload);
processData(data);
repository.save(data);
}
private DeviceData parseData(String json) {
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.readValue(json, DeviceData.class);
} catch (IOException e) {
throw new RuntimeException("数据解析失败", e);
}
}
private void processData(DeviceData data) {
// 数据校验、业务逻辑处理
if(data.getTemperature() > 100) {
alertService.sendAlert("高温警报", data.getDeviceId());
}
}
}
数据存储
使用Spring Data JPA持久化设备数据。
@Entity
public class DeviceData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String deviceId;
private double temperature;
private double humidity;
private LocalDateTime timestamp;
// getters and setters
}
public interface DeviceDataRepository extends JpaRepository<DeviceData, Long> {
List<DeviceData> findByDeviceId(String deviceId);
}
API接口
提供REST API供前端或其他系统查询设备数据。
@RestController
@RequestMapping("/api/devices")
public class DeviceController {
@Autowired
private DeviceDataService dataService;
@GetMapping("/{deviceId}/data")
public ResponseEntity<List<DeviceData>> getDeviceData(
@PathVariable String deviceId,
@RequestParam(required = false) LocalDateTime start,
@RequestParam(required = false) LocalDateTime end) {
List<DeviceData> data = dataService.getDataByDeviceAndTime(deviceId, start, end);
return ResponseEntity.ok(data);
}
}
实时监控
使用WebSocket实现设备数据的实时监控。
@Controller
public class WebSocketController {
@Autowired
private SimpMessagingTemplate template;
@Scheduled(fixedRate = 5000)
public void sendDeviceUpdates() {
List<DeviceStatus> statuses = deviceService.getAllDeviceStatuses();
template.convertAndSend("/topic/status", statuses);
}
}
安全配置
物联网系统需要严格的安全控制。
@Configuration
@EnableWebSecurity
public class SecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/api/public/**").permitAll()
.antMatchers("/api/devices/**").hasRole("ADMIN")
.anyRequest().authenticated()
.and()
.httpBasic();
}
}
应用配置
# application.properties
mqtt.broker=tcp://iot.eclipse.org:1883
spring.datasource.url=jdbc:mysql://localhost:3306/iot_db
spring.datasource.username=root
spring.datasource.password=password
spring.jpa.hibernate.ddl-auto=update
以上实例展示了Spring Boot在物联网设备接入与数据处理中的典型应用,包括设备连接、数据处理、存储和展示等关键环节。实际项目中可能需要根据具体需求进行调整和扩展。
物联网设备接入与数据处理实例(Go语言实现)
以下是一个基于Go语言的物联网设备接入与数据处理的完整实例,包含设备接入、数据解析、存储及简单分析的核心流程。
设备接入(MQTT协议)
使用Eclipse Paho
库实现MQTT客户端,接收设备上报的数据:
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"os"
"os/signal"
"syscall"
)
var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received: %s from topic: %s\n", msg.Payload(), msg.Topic())
// 调用数据处理函数
processDeviceData(msg.Payload())
}
func main() {
opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883")
opts.SetClientID("go_iot_server")
opts.SetDefaultPublishHandler(messageHandler)
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if token := client.Subscribe("iot/device/data", 1, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
// 保持连接
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
client.Unsubscribe("iot/device/data")
client.Disconnect(250)
}
数据处理(JSON解析)
假设设备上报的数据为JSON格式:
type DeviceData struct {
DeviceID string `json:"device_id"`
Timestamp int64 `json:"timestamp"`
Temp float32 `json:"temperature"`
Humidity float32 `json:"humidity"`
}
func processDeviceData(raw []byte) {
var data DeviceData
if err := json.Unmarshal(raw, &data); err != nil {
fmt.Println("JSON decode error:", err)
return
}
// 数据校验
if data.Temp > 100 || data.Humidity > 100 {
fmt.Println("Invalid sensor data")
return
}
// 存入数据库
saveToDB(data)
}
数据存储(SQLite)
使用go-sqlite3
进行本地存储:
import _ "github.com/mattn/go-sqlite3"
func saveToDB(data DeviceData) {
db, err := sql.Open("sqlite3", "./iot.db")
if err != nil {
fmt.Println("DB open error:", err)
return
}
defer db.Close()
stmt, _ := db.Prepare(`INSERT INTO device_data
(device_id, timestamp, temperature, humidity) VALUES (?, ?, ?, ?)`)
_, err = stmt.Exec(data.DeviceID, data.Timestamp, data.Temp, data.Humidity)
if err != nil {
fmt.Println("DB insert error:", err)
}
}
数据分析(简单聚合)
计算指定设备的平均温度:
func getAvgTemperature(deviceID string) float32 {
db, _ := sql.Open("sqlite3", "./iot.db")
defer db.Close()
var avg float32
row := db.QueryRow(`SELECT AVG(temperature)
FROM device_data WHERE device_id = ?`, deviceID)
row.Scan(&avg)
return avg
}
扩展建议
- 协议扩展:支持CoAP、HTTP等协议接入
- 数据管道:结合Kafka或RabbitMQ实现高吞吐量处理
- 边缘计算:在网关层使用Go实现数据预处理
- 可视化:集成Grafana通过API展示数据
关键依赖
github.com/eclipse/paho.mqtt.golang # MQTT客户端
github.com/mattn/go-sqlite3 # SQLite驱动
encoding/json # JSON处理
该实例展示了从设备接入到数据处理的完整链路,实际部署时需要根据具体场景调整错误处理、安全认证和性能优化策略。
物联网设备接入与数据处理实例(Python实现)
物联网设备接入与数据处理通常涉及设备连接、数据采集、传输、存储和分析等环节。以下是一个基于Python的完整实现示例,涵盖MQTT协议通信、数据存储和简单分析。
MQTT协议实现设备接入
安装MQTT客户端库:
pip install paho-mqtt
模拟设备发布数据:
import paho.mqtt.client as mqtt
import json
import time
broker = "mqtt.eclipseprojects.io" # 公共测试服务器
topic = "iot/sensor/temperature"
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
client = mqtt.Client()
client.on_connect = on_connect
client.connect(broker, 1883, 60)
while True:
payload = {
"device_id": "sensor_001",
"value": round(25 + (5 * (0.5 - random.random())), 2),
"timestamp": int(time.time())
}
client.publish(topic, json.dumps(payload))
time.sleep(5)
服务器端订阅数据:
def on_message(client, userdata, msg):
data = json.loads(msg.payload.decode())
print(f"Received: {data}")
client = mqtt.Client()
client.connect(broker, 1883, 60)
client.subscribe("iot/sensor/#")
client.on_message = on_message
client.loop_forever()
数据存储方案
使用SQLite进行本地存储:
import sqlite3
def init_db():
conn = sqlite3.connect('iot_data.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS sensor_data
(id INTEGER PRIMARY KEY AUTOINCREMENT,
device_id TEXT,
value REAL,
timestamp INTEGER)''')
conn.commit()
conn.close()
def save_data(data):
conn = sqlite3.connect('iot_data.db')
c = conn.cursor()
c.execute("INSERT INTO sensor_data (device_id, value, timestamp) VALUES (?, ?, ?)",
(data['device_id'], data['value'], data['timestamp']))
conn.commit()
conn.close()
数据分析处理
使用Pandas进行数据分析:
import pandas as pd
from datetime import datetime
def analyze_data():
conn = sqlite3.connect('iot_data.db')
df = pd.read_sql_query("SELECT * FROM sensor_data", conn)
conn.close()
df['datetime'] = pd.to_datetime(df['timestamp'], unit='s')
df.set_index('datetime', inplace=True)
# 按小时聚合平均值
hourly_avg = df.resample('H')['value'].mean()
print(hourly_avg)
# 异常值检测
mean = df['value'].mean()
std = df['value'].std()
anomalies = df[(df['value'] > mean + 2*std) | (df['value'] < mean - 2*std)]
print(f"Detected {len(anomalies)} anomalies")
可视化展示
使用Matplotlib进行数据可视化:
import matplotlib.pyplot as plt
def plot_data():
conn = sqlite3.connect('iot_data.db')
df = pd.read_sql_query("SELECT * FROM sensor_data", conn)
conn.close()
plt.figure(figsize=(12, 6))
plt.plot(pd.to_datetime(df['timestamp'], unit='s'), df['value'])
plt.title("Sensor Data Trend")
plt.xlabel("Time")
plt.ylabel("Temperature")
plt.grid()
plt.show()
完整数据处理流程整合
class Io