http数据传输确保完整性和保密性整流程方案(含源码)

发布于:2024-07-04 ⋅ 阅读:(17) ⋅ 点赞:(0)

在这里插入图片描述
往期文章回顾

1. 本文摘要

  • 当今数据安全越来越重要,http网络请求数据安全加强要求是为了保护公民的隐私和数据安全,防范日益复杂的网络攻击,确保电子商务和在线服务的可靠性,并遵守相关法律法规。
  • 上一篇文章中,结合我参与的项目,粗略描述了在http网络请求中如何做到数据安全,并附了一些源码
  • 本文主要介绍"http请求中数据传输的完整性和传输过程中数据的保密性",这部分如何快速融入到已有的工程中,包括中间件转发,这样就可以无缝的连接到原来的项目中。
  • 本文仍然使用python flask来演示整个过程,并详细描述了这个过程的设计流程。
    本系统所涉及的源码已打包上传。
    文中源码文件【获取方式】:关注公众号:利哥AI实例探险
    给公众号发送 “http传输安全保密” 获取下载方式
    注意发送的关键词不能错,否则匹配不到对应资源,由于本人能力有限,难免有疏漏之处。

2. 项目中经常遇到的问题及处理办法

  1. http协议数据传输,需采用校验码技术或密码技术保证重要数据在传输过程中的完整性。
  2. 鉴别信息及重要业务数据采用经国家密码主管部门认可的密码技术,保证其在传输过程中数据的保密性。

**数据的保密性:**使用国家密码管理局认可的对称加密算法(如AES)来确保数据的保密性
**数据的完整性:**可以使用HMAC(哈希消息认证码)来确保数据在传输过程中的完整性

**AES(Advanced Encryption Standard,高级加密标准)**是一种对称加密算法,用于保护数据的安全。它被广泛应用于各种数据加密场景,包括文件加密、网络通信加密等。AES以其高效性和强大的安全性成为现代数据加密的主流选择。
**哈希消息认证码(HMAC)**是一种用于验证消息完整性和真实性的技术。它结合了哈希函数和密钥,确保消息在传输过程中未被篡改。

3. 处理的问题及处理的流程

  1. 请求、接收双方共用aes key及hmac key。
  2. 请求方在发送请求时,对发送的原始数据进行aes加密,并计算哈希值
  3. 接收方在接到数据后,进行哈希值验证,以确保数据完整性,之后进行aes解密处理
  4. 接收方根据解密后的数据进行各种业务处理,在返回给请求方时,将返回数据进行aes加密,并计算哈希值
  5. 请求方在接收到请求返回时,进行哈希验证并进行aes解密,并进行业务处理
    在这里插入图片描述

4. 示例代码拆解

4.1 AES加解密

文章末尾会附带全部源码,以下为讲解
加密函数:

  • 输入:需要加密的原始数据字符串、aes key、随机生成的iv
  • 输出:加密后的数据串、随机生成的iv(供数据发送给接收方进行解密使用)

解密函数:

  • 输入:加密后的数据字符串、aes key、加密时的iv
  • 输出:解密后的原始数据
def generate_iv_16str() -> str:
    # 生成一个16字符长度的字符串作为IV
    return ''.join(random.choices(string.ascii_letters + string.digits, k=BLOCK_SIZE))

def encrypt_aes_str(plaintext, key, iv_str) -> Tuple[str, str]:
    # 将IV字符串转换为字节
    iv = iv_str.encode('utf-8')
    # 创建AES对象
    cipher = AES.new(key, AES.MODE_CBC, iv)
    # 加密并进行填充
    ciphertext = cipher.encrypt(pad(plaintext.encode('utf-8'), BLOCK_SIZE))
    # 将密文进行Base64编码
    encrypted_data = base64.b64encode(ciphertext).decode('utf-8')
    return encrypted_data, iv_str

def decrypt_aes_str(ciphertext: str, key: str, iv_str: str) -> str:
    # 将IV字符串转换为字节
    iv = iv_str.encode('utf-8')
    # 将密文解码为字节数组
    ciphertext = base64.b64decode(ciphertext)
    # 创建AES对象
    cipher = AES.new(key, AES.MODE_CBC, iv)
    # 解密并去除填充
    plaintext = unpad(cipher.decrypt(ciphertext), BLOCK_SIZE)
    return plaintext
    
if __name__ == '__main__':
    # 加密数据
    response_json = {"msg": "hello world"}
    data_str = json.dumps(response_json)
    # 加密
    encrypted_data, iv_str = encrypt_aes_str(data_str, SECRET_KEY, generate_iv_16str())
    # 解密
    body_str = decrypt_aes_str(encrypted_data, SECRET_KEY, iv_str)

4.2 哈希验证码生成

输入:加密后的字符串
输出:哈希验证码

# HMAC生成函数
def generate_hmac(data: str) -> str:
    return hmac.new(HMAC_KEY, data.encode('utf-8'), hashlib.sha256).hexdigest()

4.3 中间件处理

中间件前置处理函数:

  • 在中间件中解析数据,做数据完整性的哈希码认证
  • 在中间件中根据iv解析aes加密后的数据,再将这部分数据写入到中间件,供视图函数使用
  • 在视图函数中,进行业务处理
app = Flask(__name__)
app.before_request(before_request_func)
app.after_request(after_request_func)

def before_request_func():
    url_method = request.method.upper()
    url_str = request.url
    ip_address = request.remote_addr
    if 'OPTIONS' == url_method:
        return None
    if 'GET' == url_method:
        return None
    if 'POST' != url_method:
        return None    
    requestData = request.get_json()
    recv_iv = requestData.get('vector')
    recv_hmac = requestData.get('code')
    recv_data = requestData.get('data')
    body_str = decrypt_aes_str(recv_data, SECRET_KEY, recv_iv)
    # 验证HMAC
    calculate_hmac = generate_hmac(recv_data)
    if not hmac.compare_digest(calculate_hmac, recv_hmac):
        # 哈希消息验证码校验失败
        return jsonify({'error': 'HMAC verification failed'}), 400
    # 将修改后的数据重新赋值给request.data
    request._cached_data = body_str
    return None

中间件后置处理函数:

  • 视图函数处理完业务逻辑后,会将返回结果返回到中间件
  • 中间件需要对返回数据进行加密,再带上iv进行返回
app = Flask(__name__)
app.before_request(before_request_func)
app.after_request(after_request_func)
   
def after_request_func(response):
    url_str = request.url
    url_method = request.method.upper()
    if 'OPTIONS' == url_method:
        return response
    elif 'GET' == url_method:
        pass
    elif 'POST' == url_method:
        pass    
    response_json = response.get_json()
    # 加密数据
    data_str = json.dumps(response_json)
    encrypted_data, iv_str = encrypt_aes_str(data_str, SECRET_KEY, generate_iv_16str())
    # 生成HMAC
    hmac_value = generate_hmac(encrypted_data)
    response_data = {
        'vector': iv_str,
        'data': encrypted_data,
        'code': hmac_value
    }
    response.data = json.dumps(response_data)
    response.mimetype = 'application/json'
    return response

5. 整体代码示例及逻辑

5.1 整体源码

from flask import Flask, request, jsonify
import hmac, hashlib, base64, json, string, random
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from typing import Tuple
app = Flask(__name__)
'''
AES
    CBC、BLOCK_SIZE=16、PKCS7填充
    使用Crypto.Util.Padding中的pad和unpad函数进行PKCS7填充和去填充。这里使用块大小BLOCK_SIZE(16字节, 即128位)进行填充。
    偏移量 (IV) 的长度是有要求的。
    对于AES (高级加密标准) 来说, IV的长度必须与AES的块大小相同, 这意味着IV的长度必须是16字节(128位),因为AES的块大小是128位。
'''
# 配置参数
# 共享密钥和HMAC密钥
SECRET_KEY = b'synjones2024zhc1'  # 16字节,即128位密钥  确保是16个字符, 避免引起不必要的异常
HMAC_KEY = b'synjones2024zhc2'  # 16字节,即128位HMAC密钥
BLOCK_SIZE = 16

def generate_iv_16str() -> str:
    # 生成一个16字符长度的字符串作为IV
    return ''.join(random.choices(string.ascii_letters + string.digits, k=BLOCK_SIZE))

def encrypt_aes_str(plaintext: str, key: str, iv_str: str) -> Tuple[str, str]:
    # 将IV字符串转换为字节
    iv = iv_str.encode('utf-8')
    # 创建AES对象
    cipher = AES.new(key, AES.MODE_CBC, iv)
    # 加密并进行填充
    ciphertext = cipher.encrypt(pad(plaintext.encode('utf-8'), BLOCK_SIZE))
    # 将密文进行Base64编码
    encrypted_data = base64.b64encode(ciphertext).decode('utf-8')
    return encrypted_data, iv_str

def decrypt_aes_str(ciphertext: str, key: str, iv_str: str) -> str:
    # 将IV字符串转换为字节
    iv = iv_str.encode('utf-8')
    # 将密文解码为字节数组
    ciphertext = base64.b64decode(ciphertext)
    # 创建AES对象
    cipher = AES.new(key, AES.MODE_CBC, iv)
    # 解密并去除填充
    plaintext = unpad(cipher.decrypt(ciphertext), BLOCK_SIZE)
    return plaintext

# HMAC生成函数
def generate_hmac(data: str) -> str:
    return hmac.new(HMAC_KEY, data.encode('utf-8'), hashlib.sha256).hexdigest()

# 定义中间件处理请求数据
def before_request_func():
    url_method = request.method.upper()
    url_str = request.url
    ip_address = request.remote_addr
    print(f'---------------------------------(starting:)server receive, method={url_method}, url={url_str}, remote_addr={ip_address}')
    if 'OPTIONS' == url_method:
        return None
    if 'GET' == url_method:
        return None
    if 'POST' != url_method:
        return None
    
    # 不需要任何验证的请求
    if url_str.find("/send") > 0 \
        or url_str.find("/receive") > 0 \
        or url_str.find("/login") > 0:
        return None
    
    requestData = request.get_json()
    recv_iv = requestData.get('vector')
    recv_hmac = requestData.get('code')
    recv_data = requestData.get('data')
    body_str = decrypt_aes_str(recv_data, SECRET_KEY, recv_iv)
    print('-------------body_str=', body_str)
    # 验证HMAC
    # calculate_hmac = generate_hmac(recv_data)
    # if not hmac.compare_digest(calculate_hmac, recv_hmac):
    #     # 哈希消息验证码校验失败
    #     return jsonify({'error': 'HMAC verification failed'}), 400
    # 将修改后的数据重新赋值给request.data
    request._cached_data = body_str
    return None

# 定义中间件处理响应数据
def after_request_func(response):
    url_str = request.url
    url_method = request.method.upper()
    if 'OPTIONS' == url_method:
        return response
    elif 'GET' == url_method:
        pass
    elif 'POST' == url_method:
        pass
    
        # 不需要任何验证的请求
    if url_str.find("/send") > 0 \
        or url_str.find("/receive") > 0 \
        or url_str.find("/login") > 0:
        return response
    
    response_json = response.get_json()
    # 加密数据
    data_str = json.dumps(response_json)
    encrypted_data, iv_str = encrypt_aes_str(data_str, SECRET_KEY, generate_iv_16str())
    # 生成HMAC
    hmac_value = generate_hmac(encrypted_data)
    response_data = {
        'vector': iv_str,
        'data': encrypted_data,
        'code': hmac_value
    }
    response.data = json.dumps(response_data)
    response.mimetype = 'application/json'
    return response

app.before_request(before_request_func)
app.after_request(after_request_func)

# 对json数据进行aes加密,并生成hmac
@app.route('/send', methods=['POST'])
def send_data():
    content = request.json
    data_str = json.dumps(content) # 将JSON对象序列化为字符串
    # 加密数据
    encrypted_data, iv_str = encrypt_aes_str(data_str, SECRET_KEY, generate_iv_16str())
    # 生成HMAC
    hmac_value = generate_hmac(encrypted_data)
    response = {
        'vector': iv_str,
        'data': encrypted_data,
        'code': hmac_value
    }
    return jsonify(response)

# 对aes加密的数据进行hmac验证,并解析aes加密的数据
@app.route('/receive', methods=['POST'])
def receive_data():
    content = request.json
    iv = content.get('vector')
    encrypted_data = content.get('data')
    received_hmac = content.get('code')

    # 验证HMAC
    expected_hmac = generate_hmac(encrypted_data)
    if not hmac.compare_digest(expected_hmac, received_hmac):
        return jsonify({'error': 'HMAC verification failed'}), 400

    # 解密数据
    decrypted_data_str = decrypt_aes_str(encrypted_data, SECRET_KEY, iv)
    if decrypted_data_str is None:
        return jsonify({'error': 'Decryption failed'}), 400

    # 将解密后的字符串反序列化为JSON对象
    decrypted_data = json.loads(decrypted_data_str)
    
    return jsonify({'data': decrypted_data})

@app.route('/encrypt', methods=['POST'])
def encrypt():
    j_data = json.loads(request.data)
    print('-------------路由接收:', j_data)
    return jsonify(j_data)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

5.2 源码接口

  1. 对json进行加密(为了单独测试加密)
    在这里插入图片描述

  2. 对加密数据进行解析(为了单独测试解密)

在这里插入图片描述
3. 使用中间件流程(对应整体流程),即流程设计图中的流程,此接口是可以提供给请求端调用的接口
在这里插入图片描述

如果您觉得我分享的这些对您有用,请点击原文,关注我吧

原文地址:http数据传输确保完整性和保密性整流程方案(含源码)


网站公告

今日签到

点亮在社区的每一天
去签到