自动化运维实验(二)---自动识别设备,并导出配置

发布于:2025-08-15 ⋅ 阅读:(13) ⋅ 点赞:(0)

 目录

一、实验拓扑

二、实验目的

三、实验步骤

实验思路:

代码部分:

四、实验结果:


一、实验拓扑

二、实验目的

ssh远程登录后,识别设备类型(华三、华为、锐捷、山石、飞塔、深信服等),再输入对应设备的命令进行配置导出

三、实验步骤

实验开始之前先搭好环境,测试无误再开始

实验思路:

利用ip.txt,存放需要登录的设备IP

再一台一台登录,更具设备独有的命令进行识别,再对配置进行导出保存。

代码部分:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量导出设备配置(改进版)
- 在探测/关闭分页阶段不保存任何输出
- 仅从首次成功的“导出配置”命令开始保存 raw 输出,并对其进行清理后保存 clean 输出
- 支持自动翻页、错误判断、厂商识别等
- 输出文件:
    output/<ip>_<vendor>_<ts>_raw.txt   (可选)
    output/<ip>_<vendor>_<ts>_clean.txt
配置项可在脚本顶部调整。
"""
import paramiko
import time
import re
import os
import sys
from getpass import getpass
from datetime import datetime

# ---------- 配置区域(可按需修改) ----------
IP_FILE = "ip.txt"
OUTPUT_DIR = "output"
SSH_PORT = 22
CONNECT_TIMEOUT = 10
CMD_TIMEOUT = 6.0
READ_CHUNK = 65536

# 是否只保存 clean(True)或同时保存 raw(False)
SAVE_CLEAN_ONLY = True

VENDOR_KEYWORDS = {
    "huawei": ["huawei", "huawei vrp", "huawei Technologies", "Huawei"],
    "h3c": ["h3c", "h3c technologies", "Comware", "H3C"],
    "cisco": ["cisco", "ios", "cisco ios", "Cisco IOS Software"],
    "ruijie": ["ruijie", "ruijie networks", "rzos", "Ruijie"],
    "fortigate": ["fortigate", "fortios", "fortinet", "FortiGate"],
    "hillstone": ["hillstone", "hillstone networks", "shanshi", "HS"],
    "sangfor": ["sangfor", "深信服"],
}

# 每个厂商尝试的“导出配置”命令列表
VENDOR_CONFIG_CMDS = {
    "cisco": ["terminal length 0", "show running-config"],
    "huawei": ["display current-configuration", "screen-length 0 temporary", "display this-configuration"],
    "h3c": ["display current-configuration", "display this-configuration", "screen-length 0 temporary"],
    "ruijie": ["display current-configuration", "show running-config", "screen-length 0 temporary"],
    "fortigate": ["show full-configuration", "get system status", "show running-config", "show"],
    "hillstone": ["display current-configuration", "show running-config", "terminal length 0"],
    "sangfor": ["display current-configuration", "show running-config"],
    # fallback
    "unknown": ["display current-configuration", "show running-config", "show full-configuration", "show"]
}

# 一些通用的关闭分页命令(尝试但不将其直接视为导出成功)
PAGING_CMDS = [
    "terminal length 0",
    "screen-length 0 temporary",
    "screen-length disable",
    "page 0",
    "set cli pagination off",
    "no page",
    "undo page",
]

# ---------- 正则/判定模式 ----------
_ERR_PAT = re.compile(
    r"unrecognized command|unknown command|invalid input|command not found|% Unrecognized|% Invalid|% Unknown|^%|[\^]\s*$",
    re.IGNORECASE,
)
_PAGING_PAT = re.compile(
    r"--More--|-- MORE --|--More--|\[More\]|Press any key|<--- More --->|--More--|More:|\(q\)|\-\-more\-\-",
    re.IGNORECASE,
)
_CONFIG_HINTS = re.compile(
    r"\b(hostname|sysname|interface|system-view|vlan|ip address|current-configuration|running-config|service password-encryption|ntp server|snmp-server|boot-image|bootrom|startup-config|device name)\b",
    re.IGNORECASE,
)

# ---------- 辅助函数 ----------
def read_ip_file(path):
    ips = []
    if not os.path.exists(path):
        print("ip 文件不存在:", path)
        return ips
    with open(path, "r", encoding="utf-8") as f:
        for ln in f:
            ln = ln.strip()
            if not ln or ln.startswith("#"):
                continue
            parts = [p.strip() for p in ln.split(",")]
            if len(parts) == 1:
                ips.append((parts[0], None, None))
            elif len(parts) >= 3:
                ips.append((parts[0], parts[1], parts[2]))
            else:
                ips.append((parts[0], parts[1] if len(parts) > 1 else None, None))
    return ips

def ensure_output_dir(path):
    if not os.path.exists(path):
        os.makedirs(path, exist_ok=True)

def timestamp():
    return datetime.now().strftime("%Y%m%d_%H%M%S")

def open_ssh_client(ip, username, password, port=SSH_PORT, timeout=CONNECT_TIMEOUT):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(ip, port=port, username=username, password=password, timeout=timeout, look_for_keys=False, allow_agent=False)
    return client

def recv_all_from_shell(shell, timeout=CMD_TIMEOUT, pause=0.2):
    """
    非阻塞读取:读取直到超时段内没有新数据。
    """
    output = b""
    end_time = time.time() + timeout
    while True:
        while shell.recv_ready():
            try:
                chunk = shell.recv(READ_CHUNK)
                if not chunk:
                    break
                output += chunk
                end_time = time.time() + timeout
            except Exception:
                break
        if time.time() > end_time:
            break
        time.sleep(pause)
    try:
        return output.decode('utf-8', errors='ignore')
    except Exception:
        return output.decode('latin1', errors='ignore')

def send_cmd_and_recv(shell, cmd, cmd_timeout=CMD_TIMEOUT, short_sleep=0.2):
    """
    发送命令并收集输出(支持自动翻页)。
    返回 (out_str, error_str)
    """
    if not cmd.endswith("\n"):
        to_send = cmd + "\n"
    else:
        to_send = cmd
    try:
        shell.send(to_send)
    except Exception as e:
        return "", "send_error: " + str(e)
    time.sleep(short_sleep)
    out = recv_all_from_shell(shell, timeout=cmd_timeout)
    # 如果输出包含分页提示则自动翻页(发送空格)
    if out and _PAGING_PAT.search(out):
        accum = out
        for i in range(300):  # 上限,避免无限循环
            try:
                shell.send(" ")
            except Exception:
                break
            time.sleep(0.12)
            more = recv_all_from_shell(shell, timeout=1.0)
            if not more:
                break
            accum += more
            # 若本次返回不再包含分页提示,结束
            if not _PAGING_PAT.search(more):
                break
        out = accum
    return out, None

def try_disable_paging(shell):
    """
    尝试发送分页关闭命令(不保存这些输出)。
    返回拼接的输出字符串(仅用于本地判断/日志),但调用方不会把它保存为主配置文件。
    """
    accum = []
    for c in PAGING_CMDS:
        try:
            out, err = send_cmd_and_recv(shell, c, cmd_timeout=1.0)
            accum.append(f"CMD: {c}\n{out or ''}\nerr:{err}")
        except Exception as e:
            accum.append(f"CMD: {c}\nEXCEPTION: {e}")
    return "\n".join(accum)

def detect_vendor_from_text(text):
    if not text:
        return None
    tl = text.lower()
    for vendor, keys in VENDOR_KEYWORDS.items():
        for k in keys:
            if k.lower() in tl:
                return vendor
    return None

def try_commands_and_collect(shell, cmds, cmd_timeout=CMD_TIMEOUT):
    """
    依次尝试 cmds:
    - 对于明显用于关闭分页/切换上下文的命令(非配置导出命令),发送后丢弃输出
    - 对于疑似配置导出命令(包含 display/show running-config/current-configuration 等关键词),发起捕获并返回 raw 输出
    返回 (successful_cmd_or_None, raw_output_or_None, tried_cmd_list)
    tried_cmd_list 为命令字符串列表(仅便于 clean 函数去除回显)
    """
    tried = []
    for c in cmds:
        tried.append(c)
        lower = c.strip().lower()
        # 判断是否可能为配置导出命令(较宽的匹配)
        if any(k in lower for k in ("display current-configuration", "display this-configuration", "show running-config", "show full-configuration", "show configuration", "show config", "show running", "show full")):
            # 将该命令视为配置导出命令,开始 capture
            out, err = send_cmd_and_recv(shell, c, cmd_timeout=max(cmd_timeout, 10.0))
            if out and out.strip():
                return c, out, tried
            # 若没有拿到输出,继续尝试下一个命令
            continue
        # 否则把它当成分页关闭或上下文切换之类的命令,发送但不当作最终配置输出
        try:
            send_cmd_and_recv(shell, c, cmd_timeout=1.5)
        except Exception:
            # 忽略错误,继续尝试下一条命令
            pass
        time.sleep(0.12)
    # 如果循环结束仍未明确成功,则返回 None,caller 会再尝试 fallback
    return None, None, tried

def clean_config_output_v2(text, successful_cmd=None, tried_cmds=None):
    """
    更强力的配置清理函数(原样复用并允许传入 tried_cmds 以去掉回显)
    """
    if not text:
        return ""
    # 1. 去掉 ANSI 控制码
    t = re.sub(r'\x1B\[[0-?]*[ -/]*[@-~]', '', text)
    # 2. 统一换行并按行处理
    t = t.replace('\r\n', '\n').replace('\r', '\n')
    lines = t.split('\n')
    # 3. 构建一些便捷匹配集合
    tried_set = set()
    if tried_cmds:
        for c in tried_cmds:
            if c:
                tried_set.add(c.strip().lower())
    if successful_cmd:
        tried_set.add(successful_cmd.strip().lower())
    # helper 判断是否是 prompt 行(像 <SW6> 或 hostname> 或 hostname#)
    def is_prompt_line(l):
        s = l.strip()
        if not s:
            return False
        # <SW6>
        if re.match(r'^<[^<>]+>$', s):
            return True
        # name> or name# (单独的提示符)
        if re.match(r'^[\w\-\._]+[>#]$', s):
            return True
        return False
    # helper 判断是否是命令回显(完全等于某个尝试过的命令)
    def is_cmd_echo(l):
        s = l.strip().lower()
        if s in tried_set:
            return True
        for cmd in tried_set:
            if cmd and s.startswith(cmd) and len(s) <= len(cmd) + 6:
                return True
        return False
    # helper 判断是否是错误/噪音行
    def is_error_line(l):
        s = l.strip()
        if not s:
            return False
        if s.startswith('%') or s == '^' or s.startswith('^'):
            return True
        if re.search(r'unrecognized command|invalid input|command not found|Unrecognized', s, re.IGNORECASE):
            return True
        return False
    # helper 判断是否为“装饰性行”(大量符号)
    def is_decorative(l):
        s = l.strip()
        if not s:
            return False
        if len(s) >= 10 and (re.sub(r'[\W_]', '', s) == '' or (sum(ch.isalnum() for ch in s) / len(s)) < 0.2):
            return True
        if re.search(r'copyright', s, re.IGNORECASE) and len(s) < 200:
            return True
        return False
    # 4. 先去掉明显噪声行(但保持行索引,以便后续定位 config 起点)
    cleaned_lines = []
    for ln in lines:
        if is_prompt_line(ln):
            continue
        if is_cmd_echo(ln):
            continue
        if is_error_line(ln):
            continue
        if is_decorative(ln):
            continue
        cleaned_lines.append(ln)
    # 5. 在 cleaned_lines 里寻找配置起点
    cfg_start_keywords = [
        re.compile(r'^\s*#\s*$', re.IGNORECASE),
        re.compile(r'^\s*version\b', re.IGNORECASE),
        re.compile(r'^\s*sysname\b', re.IGNORECASE),
        re.compile(r'^\s*hostname\b', re.IGNORECASE),
        re.compile(r'^\s*interface\b', re.IGNORECASE),
        re.compile(r'current-configuration', re.IGNORECASE),
        re.compile(r'running-config', re.IGNORECASE),
        re.compile(r'^\s*!', re.IGNORECASE),
    ]
    start_idx = None
    for i, ln in enumerate(cleaned_lines):
        for p in cfg_start_keywords:
            if p.search(ln):
                if p.pattern == r'^\s*#\s*$':
                    if i + 1 < len(cleaned_lines):
                        nxt = cleaned_lines[i + 1]
                        if re.search(r'^\s*version\b|^\s*sysname\b|^\s*hostname\b|^\s*interface\b|current-configuration|running-config', nxt, re.IGNORECASE):
                            start_idx = i
                            break
                        else:
                            start_idx = i
                            break
                    else:
                        start_idx = i
                        break
                else:
                    start_idx = i
                    break
        if start_idx is not None:
            break
    # 6. 如果没找到起点,尝试根据 CONFIG_HINTS 的 regex 找第一个出现位置
    if start_idx is None:
        joined = '\n'.join(cleaned_lines)
        m = _CONFIG_HINTS.search(joined)
        if m:
            pos = m.start()
            up_to = joined[:pos]
            start_idx = up_to.count('\n')
        else:
            out = '\n'.join(l for l in cleaned_lines).strip()
            out = re.sub(r'\n\s*\n+', '\n\n', out)
            return out
    # 7. 从 start_idx 开始取后续行,并再做最后清理(去掉行首/尾多余空格,去掉连续多空行)
    final_lines = cleaned_lines[start_idx:]
    while final_lines and not final_lines[0].strip():
        final_lines.pop(0)
    final_lines2 = []
    for ln in final_lines:
        if is_prompt_line(ln):
            continue
        if is_error_line(ln):
            continue
        final_lines2.append(ln.rstrip())
    out = '\n'.join(final_lines2).strip()
    out = re.sub(r'\n\s*\n+', '\n\n', out)
    return out

def save_output(ip, vendor, raw_text, cleaned_text, prefix=OUTPUT_DIR):
    """
    保存文件:如果 SAVE_CLEAN_ONLY 为 True 则仅保存 cleaned_text,否则保存 raw + clean
    返回已保存的主路径(clean 的路径)
    """
    ensure_output_dir(prefix)
    ts = timestamp()
    safe_vendor = vendor if vendor else "unknown"
    base = f"{ip}_{safe_vendor}_{ts}"
    clean_path = os.path.join(prefix, base + "_clean.txt")
    try:
        with open(clean_path, "w", encoding="utf-8") as f:
            f.write(cleaned_text or "")
    except Exception as e:
        print(f"[{ip}] 写 clean 文件失败: {e}")
    if not SAVE_CLEAN_ONLY and raw_text is not None:
        raw_path = os.path.join(prefix, base + "_raw.txt")
        try:
            with open(raw_path, "w", encoding="utf-8") as f:
                f.write(raw_text or "")
        except Exception as e:
            print(f"[{ip}] 写 raw 文件失败: {e}")
    return clean_path

# ---------- 主流程 ----------
def process_device(ip, user, pwd, port=SSH_PORT):
    ip_str = ip
    print("-> 处理:", ip_str)
    try:
        client = open_ssh_client(ip, username=user, password=pwd, port=port)
    except Exception as e:
        msg = f"SSH 连接失败: {e}"
        print(msg)
        return {"ip": ip, "ok": False, "error": msg}
    try:
        shell = client.invoke_shell()
        time.sleep(0.2)
    except Exception as e:
        client.close()
        msg = f"invoke_shell 失败: {e}"
        print(msg)
        return {"ip": ip, "ok": False, "error": msg}

    # 读取初始 banner(不保存)
    time.sleep(0.2)
    try:
        intro = recv_all_from_shell(shell, timeout=1.0)
    except Exception:
        intro = ""

    # 尝试关闭分页(输出不保存)
    try:
        _ = try_disable_paging(shell)
    except Exception:
        pass

    # 探测厂商(发送若干探测命令,输出不保存)
    detect_cmds = ["display version", "show version", "get system status", "show system info", "uname -a"]
    detect_text = intro or ""
    for dc in detect_cmds:
        try:
            out, err = send_cmd_and_recv(shell, dc, cmd_timeout=1.5)
        except Exception:
            out = ""
        if out:
            detect_text += "\n" + out
            if re.search(r"huawei|h3c|cisco|forti|fortigate|ruijie|hillstone|sangfor|ios|vrp|comware", out, re.IGNORECASE):
                break

    vendor = detect_vendor_from_text(detect_text) or "unknown"
    print(f"   识别厂商为: {vendor}")

    # 尝试按厂商命令导出配置 —— 仅在配置命令上开始 capture 并保存
    cmds = VENDOR_CONFIG_CMDS.get(vendor, VENDOR_CONFIG_CMDS["unknown"])
    successful_cmd, raw_output, tried_cmds = try_commands_and_collect(shell, cmds, cmd_timeout=CMD_TIMEOUT)

    # 若没有拿到,作为最后手段再尝试常见命令并长时间读取(这些输出将被当作配置输出尝试保存)
    if not raw_output:
        fallback_cmds = ["display current-configuration", "show running-config", "show full-configuration"]
        for fc in fallback_cmds:
            try:
                out, err = send_cmd_and_recv(shell, fc, cmd_timeout=10.0)
            except Exception:
                out = ""
            tried_cmds.append(fc)
            if out and out.strip():
                raw_output = out
                successful_cmd = fc
                break

    # 如果仍然没有明显配置输出,尝试读取 shell 剩余的输出保存以便排查(但这并非配置)
    if not raw_output:
        time.sleep(0.5)
        more = recv_all_from_shell(shell, timeout=2.0)
        if more and more.strip():
            raw_output = more

    # 只保存配置相关内容:clean 后写入文件;raw 可选
    cleaned = clean_config_output_v2(raw_output or "", successful_cmd=successful_cmd, tried_cmds=tried_cmds)
    saved_path = save_output(ip, vendor, raw_output, cleaned)

    print(f"   输出已保存: {saved_path}")

    try:
        shell.close()
    except Exception:
        pass
    try:
        client.close()
    except Exception:
        pass

    ok = bool(cleaned and _CONFIG_HINTS.search(cleaned))
    return {"ip": ip, "ok": ok, "vendor": vendor, "path": saved_path, "raw_out": raw_output, "clean_out": cleaned}

def main():
    print("批量导出设备配置脚本(仅保存配置阶段输出)")
    ips = read_ip_file(IP_FILE)
    if not ips:
        print("ip 列表为空,请在 ip.txt 中每行写入一个 IP(或 ip,username,password)")
        return
    need_cred = any(u is None or p is None for (_, u, p) in ips)
    default_user = None
    default_pass = None
    if need_cred:
        default_user = input("请输入 SSH 用户名: ").strip()
        default_pass = getpass("请输入 SSH 密码: ")
    ensure_output_dir(OUTPUT_DIR)
    results = []
    for ip, u, p in ips:
        user = u if u else default_user
        pwd = p if p else default_pass
        if not user or not pwd:
            print(f"缺少该设备 {ip} 的用户名或密码,跳过")
            results.append({"ip": ip, "ok": False, "error": "no credentials"})
            continue
        try:
            res = process_device(ip, user, pwd, port=SSH_PORT)
            results.append(res)
        except Exception as e:
            print(f"处理 {ip} 时异常: {e}")
            results.append({"ip": ip, "ok": False, "error": str(e)})
    print("\n处理完成,总结:")
    for r in results:
        if r.get("ok"):
            print(f" {r['ip']} -> OK, vendor={r.get('vendor')}, file={r.get('path')}")
        else:
            print(f" {r['ip']} -> MAYBE FAILED (仍保存了可供排查的文件), reason={r.get('error')}, file={r.get('path')}")

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n用户中断,退出")
        sys.exit(1)

四、实验结果:

建议在终端运行,pycharm运行,getpass会卡住

可以看到每台设备都成功登录,并将配置保存在txt文件(2.15应该是设备问题)

最终生成的文件:

实验完成!