python的mitmproxy模块实现简单WAF框架

发布于:2025-05-17 ⋅ 阅读:(16) ⋅ 点赞:(0)
from mitmproxy import http
from urllib.parse import urlparse, parse_qs
import traceback

class MyRequest:
    def __init__(self,req:http.Request):
        self.raw_req=req
        self.headers={}
        self.method=req.method
        q=urlparse(req.url)
        #print(q,q.path)
        self.path=q.path
        self.params=parse_qs(q.query)
        self.protocol=q.scheme
        self.headers={k:req.headers.get_all(k) for k in req.headers}
        

_CL=[]

_RL=[]

def runcl(flow: http.HTTPFlow):
    try:
        for f in _CL:
            req=MyRequest(flow.request)
            r=f(req)
            if r:
                flow.response=http.Response.make(403,r)
                break
    except Exception as e:
        print(e)
        traceback.print_exception(e)
        flow.response=http.Response.make(403,'异常')

def runrl(res:http.Response):
    try:
        for f in _RL:
            r=f(res)
            if r:
                r:str
                res.content=r.encode('utf-8')
                res.status_code=403
                break
    except Exception as e:
        print(e)
        traceback.print_exception(e)
        res.content='异常'.encode('utf-8')
        res.status_code=403

def rule(f):
    _CL.append(f)

def after(f):
    _RL.append(f)

def request(flow: http.HTTPFlow):
    runcl(flow)

def response(flow: http.HTTPFlow):
    runrl(flow.response)


"""下面是demo"""


FIXS=['.php','.html','.js','.css','.jpg','.jpeg','.png','.mp4','.mp3','']
@rule
def check_fix(req:MyRequest):
    k=len(req.path)
    n=k
    for i in range(k-1,0,-1):
        c=req.path[i]
        if c=='.':
            n=i
            break
        if c=='/':
            break
    fix=req.path[n:]
    if not fix:
        return
    if not fix in FIXS:
        return '后缀异常:'+fix


@rule
def check_post(req:MyRequest):
    print(req.method)
    if req.method=='POST':
        print(req.raw_req.content)
        if b'a' in req.raw_req.content:
            return 'no hack'
    
    
@rule
def check_password(req:MyRequest):
    ps=req.params.get('password')
    if ps:
        p=ps[0]
        if len(p)<5:
            return '密码太短'
        
@after
def check01(res:http.Response):
    if '错误'.encode('utf-8') in res.content:
        return '屏蔽错误信息'

像这样启动模块

mitmdump -s proxy.py -p 80 --mode reverse:http://127.0.0.1:8080


网站公告

今日签到

点亮在社区的每一天
去签到