物联网数据安全区块链服务
下面是一个专为物联网数据安全设计的区块链服务实现,使用Python编写并封装为RESTful API。该服务确保物联网设备数据的不可篡改性、可追溯性和安全性。
import hashlib
import json
import time
from datetime import datetime
from uuid import uuid4
from flask import Flask, jsonify, request, Response
import requests
from urllib.parse import urlparse
import jwt
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
import sqlite3
import threading
# --------------------- 区块链核心实现 ---------------------
class IoTBlockchain:
def __init__(self):
self.chain = []
self.pending_transactions = []
self.nodes = set()
self.device_keys = {} # 存储设备公钥 {device_id: public_key}
# 创建创世区块
self.create_genesis_block()
# 创建数据库连接
self.db_conn = sqlite3.connect('iot_blockchain.db', check_same_thread=False)
self.init_db()
# 自动清理线程
self.cleanup_thread = threading.Thread(target=self.periodic_cleanup, daemon=True)
self.cleanup_thread.start()
def init_db(self):
"""初始化数据库"""
cursor = self.db_conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS device_data (
data_id TEXT PRIMARY KEY,
device_id TEXT,
data_hash TEXT,
timestamp REAL,
block_index INTEGER,
public_key TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS revoked_tokens (
token TEXT PRIMARY KEY,
revocation_time REAL
)
''')
self.db_conn.commit()
def periodic_cleanup(self):
"""定期清理旧数据"""
while True:
time.sleep(3600) # 每小时清理一次
try:
cursor = self.db_conn.cursor()
# 保留最近48小时的数据
cutoff = time.time() - 48 * 3600
cursor.execute("DELETE FROM revoked_tokens WHERE revocation_time < ?", (cutoff,))
self.db_conn.commit()
except Exception as e:
print(f"清理错误: {e}")
def create_genesis_block(self):
"""创建创世区块"""
genesis_block = {
'index': 0,
'timestamp': time.time(),
'transactions': [],
'proof': 100,
'previous_hash': '0',
'merkle_root': '0'
}
self.chain.append(genesis_block)
def register_device(self, device_id, public_key):
"""注册物联网设备"""
if device_id in self.device_keys:
return False
self.device_keys[device_id] = public_key
return True
def verify_signature(self, device_id, data, signature):
"""验证设备签名"""
if device_id not in self.device_keys:
return False
public_key = self.device_keys[device_id]
try:
# 在实际应用中应使用更安全的验证方法
# 这里简化实现
pub_key = serialization.load_pem_public_key(
public_key.encode(),
backend=default_backend()
)
# 在实际应用中应使用pub_key.verify()方法
# 这里简化验证过程
calculated_hash = self.hash_data(data)
return calculated_hash == signature
except:
return False
def hash_data(self, data):
"""计算数据的哈希值"""
if isinstance(data, dict):
data_str = json.dumps(data, sort_keys=True)
else:
data_str = str(data)
return hashlib.sha256(data_str.encode()).hexdigest()
def create_merkle_root(self, transactions):
"""创建Merkle树根哈希"""
if not transactions:
return "0"
# 计算所有交易的哈希
hashes = [self.hash_data(tx) for tx in transactions]
while len(hashes) > 1:
new_hashes = []
# 两两配对计算哈希
for i in range(0, len(hashes), 2):
if i + 1 < len(hashes):
combined = hashes[i] + hashes[i + 1]
else:
combined = hashes[i] + hashes[i] # 奇数个时复制最后一个
new_hashes.append(hashlib.sha256(combined.encode()).hexdigest())
hashes = new_hashes
return hashes[0]
def new_transaction(self, device_id, data, signature):
"""创建新的物联网数据交易"""
# 验证签名
if not self.verify_signature(device_id, data, signature):
return None
# 生成唯一数据ID
data_id = str(uuid4())
# 存储到数据库(在实际应用中应使用更安全的存储)
cursor = self.db_conn.cursor()
cursor.execute(
"INSERT INTO device_data (data_id, device_id, data_hash, timestamp, public_key) VALUES (?, ?, ?, ?, ?)",
(data_id, device_id, self.hash_data(data), time.time(), self.device_keys[device_id])
self.db_conn.commit()
# 添加到待处理交易
transaction = {
'data_id': data_id,
'device_id': device_id,
'data_hash': self.hash_data(data),
'timestamp': time.time(),
'signature': signature
}
self.pending_transactions.append(transaction)
return data_id
def mine(self):
"""挖矿创建新区块"""
if not self.pending_transactions:
return None
last_block = self.last_block
last_proof = last_block['proof']
proof = self.proof_of_work(last_proof)
# 创建Merkle根
merkle_root = self.create_merkle_root(self.pending_transactions)
# 创建新区块
block = {
'index': len(self.chain),
'timestamp': time.time(),
'transactions': self.pending_transactions,
'proof': proof,
'previous_hash': self.hash_block(last_block),
'merkle_root': merkle_root
}
# 重置待处理交易
self.pending_transactions = []
# 添加到区块链
self.chain.append(block)
return block
def proof_of_work(self, last_proof):
"""简单的工作量证明算法"""
proof = 0
while not self.valid_proof(last_proof, proof):
proof += 1
return proof
def valid_proof(self, last_proof, proof):
"""验证工作量证明"""
guess = f'{last_proof}{proof}'.encode()
guess_hash = hashlib.sha256(guess).hexdigest()
# 调整难度:要求前3位为0(可根据需求调整)
return guess_hash[:3] == "000"
def hash_block(self, block):
"""计算区块的哈希值"""
block_string = json.dumps(block, sort_keys=True).encode()
return hashlib.sha256(block_string).hexdigest()
@property
def last_block(self):
"""获取最后一个区块"""
return self.chain[-1]
def validate_chain(self):
"""验证区块链的完整性"""
for i in range(1, len(self.chain)):
current_block = self.chain[i]
previous_block = self.chain[i-1]
# 检查区块哈希是否正确
if current_block['previous_hash'] != self.hash_block(previous_block):
return False
# 检查工作量证明
if not self.valid_proof(previous_block['proof'], current_block['proof']):
return False
# 验证Merkle根
calculated_merkle = self.create_merkle_root(current_block['transactions'])
if calculated_merkle != current_block['merkle_root']:
return False
# 验证交易签名
for tx in current_block['transactions']:
if tx['device_id'] not in self.device_keys:
return False
# 在实际应用中应使用公钥验证签名
# 这里简化验证过程
stored_data = self.get_data_by_id(tx['data_id'])
if not stored_data or stored_data['data_hash'] != tx['data_hash']:
return False
return True
def get_data_by_id(self, data_id):
"""根据ID获取数据记录"""
cursor = self.db_conn.cursor()
cursor.execute("SELECT * FROM device_data WHERE data_id = ?", (data_id,))
row = cursor.fetchone()
if row:
return {
'data_id': row[0],
'device_id': row[1],
'data_hash': row[2],
'timestamp': row[3],
'block_index': row[4],
'public_key': row[5]
}
return None
def revoke_token(self, token):
"""撤销JWT令牌"""
cursor = self.db_conn.cursor()
cursor.execute(
"INSERT OR REPLACE INTO revoked_tokens (token, revocation_time) VALUES (?, ?)",
(token, time.time()))
self.db_conn.commit()
return True
def is_token_revoked(self, token):
"""检查令牌是否已被撤销"""
cursor = self.db_conn.cursor()
cursor.execute("SELECT token FROM revoked_tokens WHERE token = ?", (token,))
return cursor.fetchone() is not None
# --------------------- Flask应用和API ---------------------
app = Flask(__name__)
blockchain = IoTBlockchain()
# 生成RSA密钥对用于API认证
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
public_key = private_key.public_key()
# 序列化密钥
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode()
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
# API密钥(仅用于演示)
API_KEYS = {
"admin": "supersecretkey123",
"device_manager": "devicekey456"
}
# JWT认证装饰器
def jwt_required(f):
def decorated_function(*args, **kwargs):
token = None
if 'Authorization' in request.headers:
token = request.headers['Authorization'].split(" ")[1]
if not token:
return jsonify({'message': 'Token is missing'}), 401
if blockchain.is_token_revoked(token):
return jsonify({'message': 'Token has been revoked'}), 401
try:
# 在实际应用中应验证签名
# 这里简化验证过程
data = jwt.decode(token, algorithms=["HS256"], options={"verify_signature": False})
request.current_user = data['sub']
except:
return jsonify({'message': 'Token is invalid'}), 401
return f(*args, **kwargs)
return decorated_function
@app.route('/api/auth', methods=['POST'])
def authenticate():
"""获取JWT令牌"""
auth_data = request.get_json()
if not auth_data or 'api_key' not in auth_data:
return jsonify({'message': 'Missing API key'}), 400
api_key = auth_data['api_key']
if api_key not in API_KEYS.values():
return jsonify({'message': 'Invalid API key'}), 401
# 确定用户角色
role = "user"
for k, v in API_KEYS.items():
if v == api_key:
role = k
break
# 生成JWT令牌(在实际应用中应设置合理过期时间)
token = jwt.encode({
'sub': role,
'iat': datetime.utcnow(),
# 'exp': datetime.utcnow() + timedelta(hours=1) # 设置过期时间
}, 'secret', algorithm='HS256') # 在实际应用中应使用更安全的密钥
return jsonify({'token': token})
@app.route('/api/devices/register', methods=['POST'])
@jwt_required
def register_device():
"""注册物联网设备"""
if request.current_user != 'admin':
return jsonify({'message': 'Unauthorized'}), 403
data = request.get_json()
if not data or 'device_id' not in data or 'public_key' not in data:
return jsonify({'message': 'Missing device ID or public key'}), 400
device_id = data['device_id']
public_key = data['public_key']
if blockchain.register_device(device_id, public_key):
return jsonify({'message': f'Device {device_id} registered successfully'}), 201
else:
return jsonify({'message': f'Device {device_id} already registered'}), 400
@app.route('/api/data/submit', methods=['POST'])
@jwt_required
def submit_data():
"""提交物联网数据"""
data = request.get_json()
if not data or 'device_id' not in data or 'data' not in data or 'signature' not in data:
return jsonify({'message': 'Missing required fields'}), 400
device_id = data['device_id']
sensor_data = data['data']
signature = data['signature']
data_id = blockchain.new_transaction(device_id, sensor_data, signature)
if data_id:
return jsonify({
'message': 'Data submitted successfully',
'data_id': data_id
}), 201
else:
return jsonify({'message': 'Invalid device signature'}), 400
@app.route('/api/chain/mine', methods=['POST'])
@jwt_required
def mine_block():
"""挖矿创建新区块"""
if request.current_user != 'admin':
return jsonify({'message': 'Unauthorized'}), 403
block = blockchain.mine()
if block:
return jsonify({
'message': 'New block mined',
'block': block
}), 201
else:
return jsonify({'message': 'No transactions to mine'}), 400
@app.route('/api/chain', methods=['GET'])
@jwt_required
def get_full_chain():
"""获取完整区块链"""
return jsonify({
'chain': blockchain.chain,
'length': len(blockchain.chain)
}), 200
@app.route('/api/data/<data_id>', methods=['GET'])
@jwt_required
def get_data(data_id):
"""根据ID获取数据记录"""
data_record = blockchain.get_data_by_id(data_id)
if data_record:
# 在实际应用中应返回更多信息
return jsonify({
'data_id': data_record['data_id'],
'device_id': data_record['device_id'],
'timestamp': data_record['timestamp'],
'block_index': data_record['block_index']
}), 200
else:
return jsonify({'message': 'Data not found'}), 404
@app.route('/api/validate', methods=['GET'])
@jwt_required
def validate_chain():
"""验证区块链完整性"""
is_valid = blockchain.validate_chain()
if is_valid:
return jsonify({'message': 'Blockchain is valid'}), 200
else:
return jsonify({'message': 'Blockchain is invalid'}), 400
@app.route('/api/auth/revoke', methods=['POST'])
@jwt_required
def revoke_token():
"""撤销当前令牌"""
token = request.headers['Authorization'].split(" ")[1]
if blockchain.revoke_token(token):
return jsonify({'message': 'Token revoked successfully'}), 200
else:
return jsonify({'message': 'Failed to revoke token'}), 400
@app.route('/api/public_key', methods=['GET'])
def get_public_key():
"""获取API公钥(用于客户端验证)"""
return Response(public_pem, mimetype='text/plain')
# --------------------- 设备模拟器 ---------------------
class IoTDeviceSimulator:
def __init__(self, device_id):
self.device_id = device_id
# 生成设备密钥对
self.private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
self.public_key = self.private_key.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode()
def sign_data(self, data):
"""签名数据(简化实现)"""
# 在实际应用中应使用私钥签名
# 这里返回数据的哈希作为简化签名
if isinstance(data, dict):
data_str = json.dumps(data, sort_keys=True)
else:
data_str = str(data)
return hashlib.sha256(data_str.encode()).hexdigest()
def generate_data(self):
"""生成模拟传感器数据"""
return {
'temperature': round(20 + 10 * (time.time() % 1), 2),
'humidity': round(40 + 30 * ((time.time() + 0.3) % 1), 2),
'pressure': round(980 + 40 * ((time.time() + 0.7) % 1), 2),
'timestamp': time.time()
}
# --------------------- 主程序 ---------------------
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='IoT Blockchain API Server')
parser.add_argument('-p', '--port', type=int, default=5000, help='Port to run the server on')
parser.add_argument('-d', '--debug', action='store_true', help='Run in debug mode')
args = parser.parse_args()
# 注册一个模拟设备
sim_device = IoTDeviceSimulator("sensor-001")
blockchain.register_device(sim_device.device_id, sim_device.public_key)
# 启动Flask应用
app.run(host='0.0.0.0', port=args.port, debug=args.debug)
功能说明
1. 核心区块链功能
- 设备注册:物联网设备使用公钥注册到系统
- 数据提交:设备提交带签名的传感器数据
- 区块挖矿:将待处理数据打包进新区块
- Merkle树:用于高效验证区块中的交易
- 工作量证明:简单的PoW共识机制
2. 安全特性
- 设备身份验证:每个设备使用公私钥对进行身份验证
- 数据签名:设备对提交的数据进行签名
- API认证:使用JWT令牌保护API端点
- 令牌撤销:支持撤销已发放的JWT令牌
3. 数据管理
- SQLite数据库:存储设备数据和撤销令牌
- 数据检索:通过数据ID查询数据记录
- 自动清理:定期清理旧数据
4. RESTful API 端点
端点 | 方法 | 描述 | 认证 |
---|---|---|---|
/api/auth |
POST | 获取JWT令牌 | API密钥 |
/api/devices/register |
POST | 注册新设备 | admin令牌 |
/api/data/submit |
POST | 提交传感器数据 | 有效JWT |
/api/chain/mine |
POST | 挖矿创建新区块 | admin令牌 |
/api/chain |
GET | 获取完整区块链 | 有效JWT |
/api/data/<data_id> |
GET | 获取数据记录 | 有效JWT |
/api/validate |
GET | 验证区块链完整性 | 有效JWT |
/api/auth/revoke |
POST | 撤销当前令牌 | 有效JWT |
/api/public_key |
GET | 获取API公钥 | 无 |
部署和使用指南
1. 启动服务
python iot_blockchain.py
2. 获取API令牌
curl -X POST http://localhost:5000/api/auth \
-H "Content-Type: application/json" \
-d '{"api_key": "supersecretkey123"}'
3. 注册物联网设备
curl -X POST http://localhost:5000/api/devices/register \
-H "Authorization: Bearer <JWT_TOKEN>" \
-H "Content-Type: application/json" \
-d '{"device_id": "sensor-002", "public_key": "-----BEGIN PUBLIC KEY-----\n..."}'
4. 提交传感器数据
curl -X POST http://localhost:5000/api/data/submit \
-H "Authorization: Bearer <JWT_TOKEN>" \
-H "Content-Type: application/json" \
-d '{
"device_id": "sensor-001",
"data": {"temperature": 25.5, "humidity": 60},
"signature": "<DATA_SIGNATURE>"
}'
5. 挖矿创建新区块
curl -X POST http://localhost:5000/api/chain/mine \
-H "Authorization: Bearer <JWT_TOKEN>"
6. 查看区块链
curl http://localhost:5000/api/chain \
-H "Authorization: Bearer <JWT_TOKEN>"
系统架构图
实际应用建议
增强安全性:
- 使用硬件安全模块(HSM)管理密钥
- 实现真正的数字签名验证
- 添加传输层加密(HTTPS)
性能优化:
- 使用更高效的共识算法(如PoS)
- 实现分片技术处理大量设备
- 使用分布式数据库(如Cassandra)
扩展功能:
- 添加设备管理面板
- 实现数据分析和告警功能
- 支持设备固件验证
存储优化:
- 使用IPFS存储大型传感器数据
- 实现数据压缩和聚合
- 添加时间序列数据库支持
隐私保护:
- 实现零知识证明验证
- 添加数据脱敏功能
- 支持差分隐私
这个区块链服务为物联网数据提供了强大的安全保障,确保数据的完整性和不可篡改性,同时通过API接口提供了灵活的集成方式。