串口自动化断电测试

发布于:2025-03-22 ⋅ 阅读:(43) ⋅ 点赞:(0)

串口自动化断电测试,等待设备的串口重启完成后,通过ping命令测试设备是否可以网络ping通,然后断电重启测试。

from koardctl import KoardCtl, get_utc_now
import time
import sys
import paramiko
from scp import SCPClient
import threading
import shutil
import argparse
import os
from datetime import datetime
import serial
import subprocess
import platform


LOG = None
 
class Log:
    def __init__(self, logfile):
        now = datetime.now()
        date_time_str = now.strftime("%Y%m%d%H%M%S")
        logfile += date_time_str
        self.logfile_ = logfile
        self.fd_ = open(logfile, "w+", encoding='utf-8')

    def write(self, log):
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        log_entry = f"[{timestamp}] {log}" 
        
        print(log_entry)
        self.fd_.write(log_entry + '\n') 

    def flush(self):
        self.fd_.flush()

    def close(self):
        self.fd_.close() 



def logger(log):
    global LOG
    if LOG == None:
        LOG = Log("log.txt")
    LOG.write(log)




class Ping:
    def __init__(self, address, count=10):
        self.address = address
        self.count = count

    def ping(self):
        """执行 ping 命令并返回是否可达"""
        param = '-n' if platform.system().lower() == 'windows' else '-c'
        
        # 构造 ping 命令
        command = ['ping', param, str(self.count), self.address]
        
        try:
            # 执行 ping 命令
            output = subprocess.check_output(command, stderr=subprocess.STDOUT, universal_newlines=True)
            print(output)  # 打印命令输出
            
            # 分析输出内容
            if self.is_reachable(output):
                print(f"IPv6 地址 {self.address} 是可达的!")
                return True
            else:
                print(f"IPv6 地址 {self.address} 不可达!")
                return False
        except subprocess.CalledProcessError as e:
            print(e.output)  # 打印错误信息
            print(f"IPv6 地址 {self.address} 不可达!")
            return False

    def ping_interval(self, interval):
        ping_fail = 0
        for i in range(self.count):
            command = ['ping', '-n', '1', self.address]  # 发送一次ping请求

            try:
                output = subprocess.check_output(command, stderr=subprocess.STDOUT, universal_newlines=True)
                logger(output)  # 打印命令输出

                # 分析输出内容
                if self.is_reachable(output):
                    logger(f"IPv6 地址 {self.address} 是可达的!")
                    # return True
                else:
                    logger(f"IPv6 地址 {self.address} 不可达!")
                    ping_fail += 1
                    # return False
            
            except Exception as e:
                logger(f"执行ping命令时发生错误: {e}")

            # 等待指定的间隔时间
            time.sleep(interval)
        
        if ping_fail > 3:
            logger(f"ping_fail {ping_fail}")
            return False
        else :
            return True

    def is_reachable(self, output):
        """检查 ping 输出,确定目标是否可达"""
        # 根据不同操作系统分析输出内容
        if platform.system().lower() == 'windows':
            return "Reply from fc00:0:5:19::51:" in output  # Windows 输出包含 TTL 字段表示成功
        else:
            return "bytes from" in output  # Linux/Unix 输出中的 bytes from 表示成功

class SerialPort:
    def __init__(self, port, baudrate=9600):
        self.port = port
        self.baudrate = baudrate
        self.serial = None
        self.thread = None
        self.running = False
        self.line_ = ""
        self.start_ok_ = 0
        self.lock_ = threading.Lock()
        self.wait_line_ = 0

    def open(self):
        """打开串口"""
        if self.serial is not None and self.serial.is_open:
            logger("串口已经打开")
            return

        try:
            self.serial = serial.Serial(self.port, self.baudrate, timeout=1)
            self.running = True
            self.thread = threading.Thread(target=self.read_data)
            self.thread.daemon = True  # 设置为守护线程
            self.thread.start()
            logger(f"成功打开串口 {self.port}")
        except serial.SerialException as e:
            logger(f"无法打开串口: {e}")

    def close(self):
        """关闭串口"""
        if self.serial is not None and self.serial.is_open:
            self.running = False
            self.serial.close()
            logger("串口已关闭")

    def read_data(self):
        global start_ok
        """后台读取串口数据"""
        while self.running:

            if self.serial.in_waiting > 0:  # 检查是否有数据可读
                data = self.serial.readline().decode('utf-8').strip()  # 读取一行数据并解码
                logger(f"{data}")  # 打印收到的数据
                self.handle_data(data)
                self.lock_.acquire()
                if self.wait_line_ == 1:
                    self.line_ += data
                self.lock_.release()
            # time.sleep(0.1)  # 避免占用过多 CPU


    def handle_data(self, data):
        if data.startswith("oot@inno:~#") :
            logger("!!!!!!!!!!!!! wait start ok !!!!!!!!")
            self.start_ok_ = 1
        if data.find("oot@inno:~#") != -1:
            logger("!!!!!!!!!!!!! wait start ok !!!!!!!!")
            self.start_ok_ = 1
        

    def write_data(self, data):
        """写入数据到串口"""
        if self.serial is not None and self.serial.is_open:
            self.serial.write(data.encode('utf-8'))
            logger(f"send->{data}")
        else:
            logger("串口未打开,无法发送数据")

    def wait_until(self, data):
        self.wait_line_ = 1
        while True:
            self.lock_.acquire()
            if self.line_.find(data) != -1:
                self.line_ = ""
                self.lock_.release()
                break
            else:
                self.line_ = ""
            self.lock_.release()
            time.sleep(0.2)  # 避免占用过多 CPU
        self.wait_line_ = 0

    def write_lilne(self, data):
        self.write_data(data+"\n")

    def is_open(self):
        """检查串口是否打开"""
        return self.serial is not None and self.serial.is_open


class Ssh:
    def __init__(self, host, username, password):
        self.host_ = host
        self.username_ = username
        self.password_ = password
        self.client_ = None
        self.channel_ = None
    
    def connect(self) :
        self.client_ = paramiko.SSHClient()
        self.client_.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            logger(f"Connecting to {self.host_}...")
            self.client_.connect(hostname=self.host_, username=self.username_, password=self.password_)
            
            self.channel_ = self.client_.invoke_shell()
            logger("Interactive SSH session started.")

        except Exception as e:
            logger(f"Error: {e}")
        finally:
            self.client_.close()

    def send(self, line):
        
        # 输入命令
        command = "your_command_here"  # 替换为你想要执行的命令
        self.channel_.send(command + '\n')  # 发送命令
        
        # 等待命令执行完成
        time.sleep(2)  # 根据命令的复杂性调整时间

        # 接收输出
        output = self.channel_.recv(65535).decode('utf-8')
        logger(output)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--circle', type=int, help='Number of circles')
    args = parser.parse_args()
    circle = args.circle

    koard = KoardCtl("COM11", 9600)
    koard.power_off()

    falcon_consle = SerialPort("COM12", 115200)
    falcon_consle.open()
    PING = Ping("fc00:0:5:19::51")
    # ret = PING.ping()


    logger("======================")
    logger("start Falcon-k Lidar power-on-off test")

    start_ok = 0
    idx = 0
    while idx < 5000:
        idx += 1
        logger(f"=======ruing loop {idx}=======")
        logger(f"power on")
        koard.power_on()

        # time.sleep(100)
        # while falcon_consle.start_ok_ == 0 :
        #     time.sleep(1)

        
        falcon_consle.write_lilne("")
        falcon_consle.wait_until("oot@inno:~#")
        logger("wait ok 1")
        
        falcon_consle.write_lilne("")
        falcon_consle.wait_until("root@inno:~#")
        logger("wait ok 2")

        ret =  PING.ping_interval(0.1)
        if ret == True:
            logger("ping ok")
        else:
            logger("ping fail")
            logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            logger("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!ping fail!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            break


        logger(f"power off")
        koard.power_off()
        falcon_consle.start_ok_ = 0
        time.sleep(0.5)