串口自动化断电测试,等待设备的串口重启完成后,通过ping命令测试设备是否可以网络ping通,然后断电重启测试。
from koardctl import KoardCtl, get_utc_now
import time
import sys
import paramiko
from scp import SCPClient
import threading
import shutil
import argparse
import os
from datetime import datetime
import serial
import subprocess
import platform
LOG = None
class Log:
def __init__(self, logfile):
now = datetime.now()
date_time_str = now.strftime("%Y%m%d%H%M%S")
logfile += date_time_str
self.logfile_ = logfile
self.fd_ = open(logfile, "w+", encoding='utf-8')
def write(self, log):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {log}"
print(log_entry)
self.fd_.write(log_entry + '\n')
def flush(self):
self.fd_.flush()
def close(self):
self.fd_.close()
def logger(log):
global LOG
if LOG == None:
LOG = Log("log.txt")
LOG.write(log)
class Ping:
def __init__(self, address, count=10):
self.address = address
self.count = count
def ping(self):
"""执行 ping 命令并返回是否可达"""
param = '-n' if platform.system().lower() == 'windows' else '-c'
# 构造 ping 命令
command = ['ping', param, str(self.count), self.address]
try:
# 执行 ping 命令
output = subprocess.check_output(command, stderr=subprocess.STDOUT, universal_newlines=True)
print(output) # 打印命令输出
# 分析输出内容
if self.is_reachable(output):
print(f"IPv6 地址 {self.address} 是可达的!")
return True
else:
print(f"IPv6 地址 {self.address} 不可达!")
return False
except subprocess.CalledProcessError as e:
print(e.output) # 打印错误信息
print(f"IPv6 地址 {self.address} 不可达!")
return False
def ping_interval(self, interval):
ping_fail = 0
for i in range(self.count):
command = ['ping', '-n', '1', self.address] # 发送一次ping请求
try:
output = subprocess.check_output(command, stderr=subprocess.STDOUT, universal_newlines=True)
logger(output) # 打印命令输出
# 分析输出内容
if self.is_reachable(output):
logger(f"IPv6 地址 {self.address} 是可达的!")
# return True
else:
logger(f"IPv6 地址 {self.address} 不可达!")
ping_fail += 1
# return False
except Exception as e:
logger(f"执行ping命令时发生错误: {e}")
# 等待指定的间隔时间
time.sleep(interval)
if ping_fail > 3:
logger(f"ping_fail {ping_fail}")
return False
else :
return True
def is_reachable(self, output):
"""检查 ping 输出,确定目标是否可达"""
# 根据不同操作系统分析输出内容
if platform.system().lower() == 'windows':
return "Reply from fc00:0:5:19::51:" in output # Windows 输出包含 TTL 字段表示成功
else:
return "bytes from" in output # Linux/Unix 输出中的 bytes from 表示成功
class SerialPort:
def __init__(self, port, baudrate=9600):
self.port = port
self.baudrate = baudrate
self.serial = None
self.thread = None
self.running = False
self.line_ = ""
self.start_ok_ = 0
self.lock_ = threading.Lock()
self.wait_line_ = 0
def open(self):
"""打开串口"""
if self.serial is not None and self.serial.is_open:
logger("串口已经打开")
return
try:
self.serial = serial.Serial(self.port, self.baudrate, timeout=1)
self.running = True
self.thread = threading.Thread(target=self.read_data)
self.thread.daemon = True # 设置为守护线程
self.thread.start()
logger(f"成功打开串口 {self.port}")
except serial.SerialException as e:
logger(f"无法打开串口: {e}")
def close(self):
"""关闭串口"""
if self.serial is not None and self.serial.is_open:
self.running = False
self.serial.close()
logger("串口已关闭")
def read_data(self):
global start_ok
"""后台读取串口数据"""
while self.running:
if self.serial.in_waiting > 0: # 检查是否有数据可读
data = self.serial.readline().decode('utf-8').strip() # 读取一行数据并解码
logger(f"{data}") # 打印收到的数据
self.handle_data(data)
self.lock_.acquire()
if self.wait_line_ == 1:
self.line_ += data
self.lock_.release()
# time.sleep(0.1) # 避免占用过多 CPU
def handle_data(self, data):
if data.startswith("oot@inno:~#") :
logger("!!!!!!!!!!!!! wait start ok !!!!!!!!")
self.start_ok_ = 1
if data.find("oot@inno:~#") != -1:
logger("!!!!!!!!!!!!! wait start ok !!!!!!!!")
self.start_ok_ = 1
def write_data(self, data):
"""写入数据到串口"""
if self.serial is not None and self.serial.is_open:
self.serial.write(data.encode('utf-8'))
logger(f"send->{data}")
else:
logger("串口未打开,无法发送数据")
def wait_until(self, data):
self.wait_line_ = 1
while True:
self.lock_.acquire()
if self.line_.find(data) != -1:
self.line_ = ""
self.lock_.release()
break
else:
self.line_ = ""
self.lock_.release()
time.sleep(0.2) # 避免占用过多 CPU
self.wait_line_ = 0
def write_lilne(self, data):
self.write_data(data+"\n")
def is_open(self):
"""检查串口是否打开"""
return self.serial is not None and self.serial.is_open
class Ssh:
def __init__(self, host, username, password):
self.host_ = host
self.username_ = username
self.password_ = password
self.client_ = None
self.channel_ = None
def connect(self) :
self.client_ = paramiko.SSHClient()
self.client_.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
logger(f"Connecting to {self.host_}...")
self.client_.connect(hostname=self.host_, username=self.username_, password=self.password_)
self.channel_ = self.client_.invoke_shell()
logger("Interactive SSH session started.")
except Exception as e:
logger(f"Error: {e}")
finally:
self.client_.close()
def send(self, line):
# 输入命令
command = "your_command_here" # 替换为你想要执行的命令
self.channel_.send(command + '\n') # 发送命令
# 等待命令执行完成
time.sleep(2) # 根据命令的复杂性调整时间
# 接收输出
output = self.channel_.recv(65535).decode('utf-8')
logger(output)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--circle', type=int, help='Number of circles')
args = parser.parse_args()
circle = args.circle
koard = KoardCtl("COM11", 9600)
koard.power_off()
falcon_consle = SerialPort("COM12", 115200)
falcon_consle.open()
PING = Ping("fc00:0:5:19::51")
# ret = PING.ping()
logger("======================")
logger("start Falcon-k Lidar power-on-off test")
start_ok = 0
idx = 0
while idx < 5000:
idx += 1
logger(f"=======ruing loop {idx}=======")
logger(f"power on")
koard.power_on()
# time.sleep(100)
# while falcon_consle.start_ok_ == 0 :
# time.sleep(1)
falcon_consle.write_lilne("")
falcon_consle.wait_until("oot@inno:~#")
logger("wait ok 1")
falcon_consle.write_lilne("")
falcon_consle.wait_until("root@inno:~#")
logger("wait ok 2")
ret = PING.ping_interval(0.1)
if ret == True:
logger("ping ok")
else:
logger("ping fail")
logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
break
logger(f"power off")
koard.power_off()
falcon_consle.start_ok_ = 0
time.sleep(0.5)