目录
一、实验拓扑
二、实验目的
ssh远程登录后,识别设备类型(华三、华为、锐捷、山石、飞塔、深信服等),再输入对应设备的命令进行配置导出
三、实验步骤
实验开始之前先搭好环境,测试无误再开始
实验思路:
利用ip.txt,存放需要登录的设备IP
再一台一台登录,更具设备独有的命令进行识别,再对配置进行导出保存。
代码部分:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
批量导出设备配置(改进版)
- 在探测/关闭分页阶段不保存任何输出
- 仅从首次成功的“导出配置”命令开始保存 raw 输出,并对其进行清理后保存 clean 输出
- 支持自动翻页、错误判断、厂商识别等
- 输出文件:
output/<ip>_<vendor>_<ts>_raw.txt (可选)
output/<ip>_<vendor>_<ts>_clean.txt
配置项可在脚本顶部调整。
"""
import paramiko
import time
import re
import os
import sys
from getpass import getpass
from datetime import datetime
# ---------- 配置区域(可按需修改) ----------
IP_FILE = "ip.txt"
OUTPUT_DIR = "output"
SSH_PORT = 22
CONNECT_TIMEOUT = 10
CMD_TIMEOUT = 6.0
READ_CHUNK = 65536
# 是否只保存 clean(True)或同时保存 raw(False)
SAVE_CLEAN_ONLY = True
VENDOR_KEYWORDS = {
"huawei": ["huawei", "huawei vrp", "huawei Technologies", "Huawei"],
"h3c": ["h3c", "h3c technologies", "Comware", "H3C"],
"cisco": ["cisco", "ios", "cisco ios", "Cisco IOS Software"],
"ruijie": ["ruijie", "ruijie networks", "rzos", "Ruijie"],
"fortigate": ["fortigate", "fortios", "fortinet", "FortiGate"],
"hillstone": ["hillstone", "hillstone networks", "shanshi", "HS"],
"sangfor": ["sangfor", "深信服"],
}
# 每个厂商尝试的“导出配置”命令列表
VENDOR_CONFIG_CMDS = {
"cisco": ["terminal length 0", "show running-config"],
"huawei": ["display current-configuration", "screen-length 0 temporary", "display this-configuration"],
"h3c": ["display current-configuration", "display this-configuration", "screen-length 0 temporary"],
"ruijie": ["display current-configuration", "show running-config", "screen-length 0 temporary"],
"fortigate": ["show full-configuration", "get system status", "show running-config", "show"],
"hillstone": ["display current-configuration", "show running-config", "terminal length 0"],
"sangfor": ["display current-configuration", "show running-config"],
# fallback
"unknown": ["display current-configuration", "show running-config", "show full-configuration", "show"]
}
# 一些通用的关闭分页命令(尝试但不将其直接视为导出成功)
PAGING_CMDS = [
"terminal length 0",
"screen-length 0 temporary",
"screen-length disable",
"page 0",
"set cli pagination off",
"no page",
"undo page",
]
# ---------- 正则/判定模式 ----------
_ERR_PAT = re.compile(
r"unrecognized command|unknown command|invalid input|command not found|% Unrecognized|% Invalid|% Unknown|^%|[\^]\s*$",
re.IGNORECASE,
)
_PAGING_PAT = re.compile(
r"--More--|-- MORE --|--More--|\[More\]|Press any key|<--- More --->|--More--|More:|\(q\)|\-\-more\-\-",
re.IGNORECASE,
)
_CONFIG_HINTS = re.compile(
r"\b(hostname|sysname|interface|system-view|vlan|ip address|current-configuration|running-config|service password-encryption|ntp server|snmp-server|boot-image|bootrom|startup-config|device name)\b",
re.IGNORECASE,
)
# ---------- 辅助函数 ----------
def read_ip_file(path):
ips = []
if not os.path.exists(path):
print("ip 文件不存在:", path)
return ips
with open(path, "r", encoding="utf-8") as f:
for ln in f:
ln = ln.strip()
if not ln or ln.startswith("#"):
continue
parts = [p.strip() for p in ln.split(",")]
if len(parts) == 1:
ips.append((parts[0], None, None))
elif len(parts) >= 3:
ips.append((parts[0], parts[1], parts[2]))
else:
ips.append((parts[0], parts[1] if len(parts) > 1 else None, None))
return ips
def ensure_output_dir(path):
if not os.path.exists(path):
os.makedirs(path, exist_ok=True)
def timestamp():
return datetime.now().strftime("%Y%m%d_%H%M%S")
def open_ssh_client(ip, username, password, port=SSH_PORT, timeout=CONNECT_TIMEOUT):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, port=port, username=username, password=password, timeout=timeout, look_for_keys=False, allow_agent=False)
return client
def recv_all_from_shell(shell, timeout=CMD_TIMEOUT, pause=0.2):
"""
非阻塞读取:读取直到超时段内没有新数据。
"""
output = b""
end_time = time.time() + timeout
while True:
while shell.recv_ready():
try:
chunk = shell.recv(READ_CHUNK)
if not chunk:
break
output += chunk
end_time = time.time() + timeout
except Exception:
break
if time.time() > end_time:
break
time.sleep(pause)
try:
return output.decode('utf-8', errors='ignore')
except Exception:
return output.decode('latin1', errors='ignore')
def send_cmd_and_recv(shell, cmd, cmd_timeout=CMD_TIMEOUT, short_sleep=0.2):
"""
发送命令并收集输出(支持自动翻页)。
返回 (out_str, error_str)
"""
if not cmd.endswith("\n"):
to_send = cmd + "\n"
else:
to_send = cmd
try:
shell.send(to_send)
except Exception as e:
return "", "send_error: " + str(e)
time.sleep(short_sleep)
out = recv_all_from_shell(shell, timeout=cmd_timeout)
# 如果输出包含分页提示则自动翻页(发送空格)
if out and _PAGING_PAT.search(out):
accum = out
for i in range(300): # 上限,避免无限循环
try:
shell.send(" ")
except Exception:
break
time.sleep(0.12)
more = recv_all_from_shell(shell, timeout=1.0)
if not more:
break
accum += more
# 若本次返回不再包含分页提示,结束
if not _PAGING_PAT.search(more):
break
out = accum
return out, None
def try_disable_paging(shell):
"""
尝试发送分页关闭命令(不保存这些输出)。
返回拼接的输出字符串(仅用于本地判断/日志),但调用方不会把它保存为主配置文件。
"""
accum = []
for c in PAGING_CMDS:
try:
out, err = send_cmd_and_recv(shell, c, cmd_timeout=1.0)
accum.append(f"CMD: {c}\n{out or ''}\nerr:{err}")
except Exception as e:
accum.append(f"CMD: {c}\nEXCEPTION: {e}")
return "\n".join(accum)
def detect_vendor_from_text(text):
if not text:
return None
tl = text.lower()
for vendor, keys in VENDOR_KEYWORDS.items():
for k in keys:
if k.lower() in tl:
return vendor
return None
def try_commands_and_collect(shell, cmds, cmd_timeout=CMD_TIMEOUT):
"""
依次尝试 cmds:
- 对于明显用于关闭分页/切换上下文的命令(非配置导出命令),发送后丢弃输出
- 对于疑似配置导出命令(包含 display/show running-config/current-configuration 等关键词),发起捕获并返回 raw 输出
返回 (successful_cmd_or_None, raw_output_or_None, tried_cmd_list)
tried_cmd_list 为命令字符串列表(仅便于 clean 函数去除回显)
"""
tried = []
for c in cmds:
tried.append(c)
lower = c.strip().lower()
# 判断是否可能为配置导出命令(较宽的匹配)
if any(k in lower for k in ("display current-configuration", "display this-configuration", "show running-config", "show full-configuration", "show configuration", "show config", "show running", "show full")):
# 将该命令视为配置导出命令,开始 capture
out, err = send_cmd_and_recv(shell, c, cmd_timeout=max(cmd_timeout, 10.0))
if out and out.strip():
return c, out, tried
# 若没有拿到输出,继续尝试下一个命令
continue
# 否则把它当成分页关闭或上下文切换之类的命令,发送但不当作最终配置输出
try:
send_cmd_and_recv(shell, c, cmd_timeout=1.5)
except Exception:
# 忽略错误,继续尝试下一条命令
pass
time.sleep(0.12)
# 如果循环结束仍未明确成功,则返回 None,caller 会再尝试 fallback
return None, None, tried
def clean_config_output_v2(text, successful_cmd=None, tried_cmds=None):
"""
更强力的配置清理函数(原样复用并允许传入 tried_cmds 以去掉回显)
"""
if not text:
return ""
# 1. 去掉 ANSI 控制码
t = re.sub(r'\x1B\[[0-?]*[ -/]*[@-~]', '', text)
# 2. 统一换行并按行处理
t = t.replace('\r\n', '\n').replace('\r', '\n')
lines = t.split('\n')
# 3. 构建一些便捷匹配集合
tried_set = set()
if tried_cmds:
for c in tried_cmds:
if c:
tried_set.add(c.strip().lower())
if successful_cmd:
tried_set.add(successful_cmd.strip().lower())
# helper 判断是否是 prompt 行(像 <SW6> 或 hostname> 或 hostname#)
def is_prompt_line(l):
s = l.strip()
if not s:
return False
# <SW6>
if re.match(r'^<[^<>]+>$', s):
return True
# name> or name# (单独的提示符)
if re.match(r'^[\w\-\._]+[>#]$', s):
return True
return False
# helper 判断是否是命令回显(完全等于某个尝试过的命令)
def is_cmd_echo(l):
s = l.strip().lower()
if s in tried_set:
return True
for cmd in tried_set:
if cmd and s.startswith(cmd) and len(s) <= len(cmd) + 6:
return True
return False
# helper 判断是否是错误/噪音行
def is_error_line(l):
s = l.strip()
if not s:
return False
if s.startswith('%') or s == '^' or s.startswith('^'):
return True
if re.search(r'unrecognized command|invalid input|command not found|Unrecognized', s, re.IGNORECASE):
return True
return False
# helper 判断是否为“装饰性行”(大量符号)
def is_decorative(l):
s = l.strip()
if not s:
return False
if len(s) >= 10 and (re.sub(r'[\W_]', '', s) == '' or (sum(ch.isalnum() for ch in s) / len(s)) < 0.2):
return True
if re.search(r'copyright', s, re.IGNORECASE) and len(s) < 200:
return True
return False
# 4. 先去掉明显噪声行(但保持行索引,以便后续定位 config 起点)
cleaned_lines = []
for ln in lines:
if is_prompt_line(ln):
continue
if is_cmd_echo(ln):
continue
if is_error_line(ln):
continue
if is_decorative(ln):
continue
cleaned_lines.append(ln)
# 5. 在 cleaned_lines 里寻找配置起点
cfg_start_keywords = [
re.compile(r'^\s*#\s*$', re.IGNORECASE),
re.compile(r'^\s*version\b', re.IGNORECASE),
re.compile(r'^\s*sysname\b', re.IGNORECASE),
re.compile(r'^\s*hostname\b', re.IGNORECASE),
re.compile(r'^\s*interface\b', re.IGNORECASE),
re.compile(r'current-configuration', re.IGNORECASE),
re.compile(r'running-config', re.IGNORECASE),
re.compile(r'^\s*!', re.IGNORECASE),
]
start_idx = None
for i, ln in enumerate(cleaned_lines):
for p in cfg_start_keywords:
if p.search(ln):
if p.pattern == r'^\s*#\s*$':
if i + 1 < len(cleaned_lines):
nxt = cleaned_lines[i + 1]
if re.search(r'^\s*version\b|^\s*sysname\b|^\s*hostname\b|^\s*interface\b|current-configuration|running-config', nxt, re.IGNORECASE):
start_idx = i
break
else:
start_idx = i
break
else:
start_idx = i
break
else:
start_idx = i
break
if start_idx is not None:
break
# 6. 如果没找到起点,尝试根据 CONFIG_HINTS 的 regex 找第一个出现位置
if start_idx is None:
joined = '\n'.join(cleaned_lines)
m = _CONFIG_HINTS.search(joined)
if m:
pos = m.start()
up_to = joined[:pos]
start_idx = up_to.count('\n')
else:
out = '\n'.join(l for l in cleaned_lines).strip()
out = re.sub(r'\n\s*\n+', '\n\n', out)
return out
# 7. 从 start_idx 开始取后续行,并再做最后清理(去掉行首/尾多余空格,去掉连续多空行)
final_lines = cleaned_lines[start_idx:]
while final_lines and not final_lines[0].strip():
final_lines.pop(0)
final_lines2 = []
for ln in final_lines:
if is_prompt_line(ln):
continue
if is_error_line(ln):
continue
final_lines2.append(ln.rstrip())
out = '\n'.join(final_lines2).strip()
out = re.sub(r'\n\s*\n+', '\n\n', out)
return out
def save_output(ip, vendor, raw_text, cleaned_text, prefix=OUTPUT_DIR):
"""
保存文件:如果 SAVE_CLEAN_ONLY 为 True 则仅保存 cleaned_text,否则保存 raw + clean
返回已保存的主路径(clean 的路径)
"""
ensure_output_dir(prefix)
ts = timestamp()
safe_vendor = vendor if vendor else "unknown"
base = f"{ip}_{safe_vendor}_{ts}"
clean_path = os.path.join(prefix, base + "_clean.txt")
try:
with open(clean_path, "w", encoding="utf-8") as f:
f.write(cleaned_text or "")
except Exception as e:
print(f"[{ip}] 写 clean 文件失败: {e}")
if not SAVE_CLEAN_ONLY and raw_text is not None:
raw_path = os.path.join(prefix, base + "_raw.txt")
try:
with open(raw_path, "w", encoding="utf-8") as f:
f.write(raw_text or "")
except Exception as e:
print(f"[{ip}] 写 raw 文件失败: {e}")
return clean_path
# ---------- 主流程 ----------
def process_device(ip, user, pwd, port=SSH_PORT):
ip_str = ip
print("-> 处理:", ip_str)
try:
client = open_ssh_client(ip, username=user, password=pwd, port=port)
except Exception as e:
msg = f"SSH 连接失败: {e}"
print(msg)
return {"ip": ip, "ok": False, "error": msg}
try:
shell = client.invoke_shell()
time.sleep(0.2)
except Exception as e:
client.close()
msg = f"invoke_shell 失败: {e}"
print(msg)
return {"ip": ip, "ok": False, "error": msg}
# 读取初始 banner(不保存)
time.sleep(0.2)
try:
intro = recv_all_from_shell(shell, timeout=1.0)
except Exception:
intro = ""
# 尝试关闭分页(输出不保存)
try:
_ = try_disable_paging(shell)
except Exception:
pass
# 探测厂商(发送若干探测命令,输出不保存)
detect_cmds = ["display version", "show version", "get system status", "show system info", "uname -a"]
detect_text = intro or ""
for dc in detect_cmds:
try:
out, err = send_cmd_and_recv(shell, dc, cmd_timeout=1.5)
except Exception:
out = ""
if out:
detect_text += "\n" + out
if re.search(r"huawei|h3c|cisco|forti|fortigate|ruijie|hillstone|sangfor|ios|vrp|comware", out, re.IGNORECASE):
break
vendor = detect_vendor_from_text(detect_text) or "unknown"
print(f" 识别厂商为: {vendor}")
# 尝试按厂商命令导出配置 —— 仅在配置命令上开始 capture 并保存
cmds = VENDOR_CONFIG_CMDS.get(vendor, VENDOR_CONFIG_CMDS["unknown"])
successful_cmd, raw_output, tried_cmds = try_commands_and_collect(shell, cmds, cmd_timeout=CMD_TIMEOUT)
# 若没有拿到,作为最后手段再尝试常见命令并长时间读取(这些输出将被当作配置输出尝试保存)
if not raw_output:
fallback_cmds = ["display current-configuration", "show running-config", "show full-configuration"]
for fc in fallback_cmds:
try:
out, err = send_cmd_and_recv(shell, fc, cmd_timeout=10.0)
except Exception:
out = ""
tried_cmds.append(fc)
if out and out.strip():
raw_output = out
successful_cmd = fc
break
# 如果仍然没有明显配置输出,尝试读取 shell 剩余的输出保存以便排查(但这并非配置)
if not raw_output:
time.sleep(0.5)
more = recv_all_from_shell(shell, timeout=2.0)
if more and more.strip():
raw_output = more
# 只保存配置相关内容:clean 后写入文件;raw 可选
cleaned = clean_config_output_v2(raw_output or "", successful_cmd=successful_cmd, tried_cmds=tried_cmds)
saved_path = save_output(ip, vendor, raw_output, cleaned)
print(f" 输出已保存: {saved_path}")
try:
shell.close()
except Exception:
pass
try:
client.close()
except Exception:
pass
ok = bool(cleaned and _CONFIG_HINTS.search(cleaned))
return {"ip": ip, "ok": ok, "vendor": vendor, "path": saved_path, "raw_out": raw_output, "clean_out": cleaned}
def main():
print("批量导出设备配置脚本(仅保存配置阶段输出)")
ips = read_ip_file(IP_FILE)
if not ips:
print("ip 列表为空,请在 ip.txt 中每行写入一个 IP(或 ip,username,password)")
return
need_cred = any(u is None or p is None for (_, u, p) in ips)
default_user = None
default_pass = None
if need_cred:
default_user = input("请输入 SSH 用户名: ").strip()
default_pass = getpass("请输入 SSH 密码: ")
ensure_output_dir(OUTPUT_DIR)
results = []
for ip, u, p in ips:
user = u if u else default_user
pwd = p if p else default_pass
if not user or not pwd:
print(f"缺少该设备 {ip} 的用户名或密码,跳过")
results.append({"ip": ip, "ok": False, "error": "no credentials"})
continue
try:
res = process_device(ip, user, pwd, port=SSH_PORT)
results.append(res)
except Exception as e:
print(f"处理 {ip} 时异常: {e}")
results.append({"ip": ip, "ok": False, "error": str(e)})
print("\n处理完成,总结:")
for r in results:
if r.get("ok"):
print(f" {r['ip']} -> OK, vendor={r.get('vendor')}, file={r.get('path')}")
else:
print(f" {r['ip']} -> MAYBE FAILED (仍保存了可供排查的文件), reason={r.get('error')}, file={r.get('path')}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n用户中断,退出")
sys.exit(1)
四、实验结果:
建议在终端运行,pycharm运行,getpass会卡住
可以看到每台设备都成功登录,并将配置保存在txt文件(2.15应该是设备问题)
最终生成的文件:
实验完成!