https改造-python https 改造

发布于:2024-07-25 ⋅ 阅读:(99) ⋅ 点赞:(0)

前言

  如果您觉得有用的话,记得给博主点个赞,评论,收藏一键三连啊,写作不易啊^ _ ^。
  而且听说点赞的人每天的运气都不会太差,实在白嫖的话,那欢迎常来啊!!!


https改造-python https 改造

这里说一下要改造的内容:
1、https 配置信任库;
2、客户端带证书https发送,、服务端关闭主机、ip验证;

代码结构,下图红框处:
在这里插入图片描述

1.1. https 配置信任库

SslConfig.py:

import ssl

def _addSsl(cert, key, hundle):
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_cert_chain(certfile=cert, keyfile=key)
    context.load_verify_locations(cafile=hundle)
    context.verify_mode = ssl.CERT_REQUIRED  # 表示需要进行客户端认证。
    context.check_hostname = False  # 禁用主机名验证
    return context


import configparser
import logging
import pathlib
from common.SslConfig import _addSsl

from flask import Flask

app = Flask(__name__)
# SSL 配置
current_dir = pathlib.Path(__file__).parent
cert = current_dir / 'ssl' / 'psbc_crt.pem'
key = current_dir / 'ssl' / 'psbc_key.pem'
bundle = current_dir / 'ssl' / 'psbc_full.pem'
context = _addSsl(cert, key, bundle)


@app.route('/data')
def data():
    return 'hello world'


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8007, ssl_context=context)

测试:
访问:
https://127.0.0.1:8007/data
在这里插入图片描述
在这里插入图片描述

2. 客户端带证书https发送,、服务端关闭主机、ip验证

SslConfig.py:

import ssl
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context


# SSL 配置


class SSLAdapter(HTTPAdapter):
    def __init__(self, cert=None, key=None, bundle=None, **kwargs):
        self.cert = cert
        self.key = key
        self.bundle = bundle
        super().__init__(**kwargs)

    def init_poolmanager(self, *args, **kwargs):
        context = create_urllib3_context()
        if self.cert and self.key:
            context.load_cert_chain(certfile=self.cert, keyfile=self.key)
        if self.bundle:
            context.load_verify_locations(cafile=self.bundle)
        context.check_hostname = False  # 不检查主机名
        context.verify_mode = ssl.CERT_NONE  # 不验证证书
        kwargs['ssl_context'] = context
        return super().init_poolmanager(*args, **kwargs)


def _addSsl(cert, key, hundle):
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_cert_chain(certfile=cert, keyfile=key)
    context.load_verify_locations(cafile=hundle)
    context.verify_mode = ssl.CERT_REQUIRED  # 表示需要进行客户端认证。
    context.check_hostname = False  # 禁用主机名验证
    return context


def _ssl_session(cert, key, hundle):
    session = requests.Session()
    adapter = SSLAdapter(cert=cert, key=key, bundle=hundle)
    session.mount('https://', adapter)
    session.verify = False  # 禁用证书验证
    return session


LogConfig.py

import logging
from logging import handlers


def _logging(log, f_name, backup_count, level_tag):
    # 日志格式化 and 日志切割配置
    level = logging.DEBUG
    if level_tag == 'INFO':
        level = logging.INFO

    datefmt = '[%Y-%m-%d %H:%M:%S]'
    format = '%(asctime)s - [%(levelname)s] - %(filename)s [line:%(lineno)d] - %(message)s'
    format_str = logging.Formatter(format, datefmt)

    # File handler
    th = handlers.TimedRotatingFileHandler(
        filename=f_name,
        when='D',
        interval=1,
        backupCount=backup_count,
        encoding='utf-8'
    )
    th.setFormatter(format_str)
    th.setLevel(level)
    log.addHandler(th)

    # Console handler
    ch = logging.StreamHandler()
    ch.setFormatter(format_str)
    ch.setLevel(level)
    log.addHandler(ch)

    log.setLevel(level)
    return log

httpsDemo.py

import configparser
import logging
import pathlib
from common.LogConfig import _logging
from common.SslConfig import _addSsl,_ssl_session

import requests
from flask import Flask

app = Flask(__name__)
current_dir = pathlib.Path(__file__).parent
config_file = current_dir / 'config' / 'httpsDemo.ini'
config = configparser.ConfigParser()
with open(config_file, 'r', encoding='utf-8') as f:
    config.read_file(f)

log_file = current_dir / config['logging']['log_file']
log_level = config['logging'].get('log_level', 'INFO').upper()
log_back_up_days = config['logging'].getint('log_back_up_days', 5)  # 默认保留5个文件

# 初始化日志配置
log_file.parent.mkdir(parents=True, exist_ok=True)
log = logging.getLogger(__name__)
logger = _logging(log, log_file, log_back_up_days, log_level)
https_flag = config['params'].getint('https_flag', 0)

# SSL 配置
cert = current_dir / 'ssl' / 'psbc_crt.pem'
key = current_dir / 'ssl' / 'psbc_key.pem'
bundle = current_dir / 'ssl' / 'psbc_full.pem'
context = _addSsl(cert, key, bundle)
session = _ssl_session(cert, key, bundle)


@app.route('/data')
def data():
    logger.info('======================== psot data hello world')
    return 'hello world'


@app.route('/index')
def index():
    msg = 'error'
    url = 'https://127.0.0.1:8180/api/https/a'
    logger.info(f'======================== get:{url}')
    try:
        # 指定信任证书库
        response = session.get(url)
        msg = response.text
    except Exception as e:
        logger.error(e)
    logger.info(f'response:{msg}')
    return msg


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8007, ssl_context=context)


网站公告

今日签到

点亮在社区的每一天
去签到