qt tcpsocket编程遇到的并发问题

发布于:2025-04-04 ⋅ 阅读:(9) ⋅ 点赞:(0)

1. 单个socket中接收消息的方法要使用局部变量而非全局,避免消息频发时产生脏数据

优化后的关键代码

  • recieveInfo() 方法通过返回内部处理后的 msg 进行传递
  • if (data.indexOf("0103") == -1) { 这里增加了判断, 对数据(非注册和心跳)再进行一遍过滤
void CustomTcpSocket::handleReadyRead()
{
    // 信号灯闪烁
    emit SignalRelayer::instance()->msgInOut(true);
    // 接收到消息
    QString msg = recieveInfo();
    // 解析收到的数据
    parseRegisterMsg(msg.split(",")[0], msg.split(",")[1]);
    // 判断注册的设备号是否可用
    if(!Myapp::devEnabled(devNumber)) {
        return;
    }
    // 判断是否是业务数据
    if(isNotBusinessInfo()) return;
    // 不是0103开头的数据不处理
    QString data = msg.split(",")[1];
    if (data.indexOf("0103") == -1) {
        qDebug() << "不是0103开头数据不做处理, data: " << data;
        return;
    }
    // 电路数据处理
    handleReadReadImpl(data);
}

以下是socket自定义类源码.cpp

#include "customtcpsocket.h"

#include <iostream>

CustomTcpSocket::CustomTcpSocket(QObject *parent)
    : QTcpSocket(parent)
{
    sendMsg = QString("01 03 00 00 00 06 C5 C8");
    // 监听获取设备数据
    connect(this, &QIODevice::readyRead, this, &CustomTcpSocket::handleReadyRead);
    // 监听采集频率变更
    connect(SignalRelayer::instance(), &SignalRelayer::socketIntervalTimeChanged, this, &CustomTcpSocket::handleChangeIntervalTime);
    connect(SignalRelayer::instance(), &SignalRelayer::socketDestroy, this, [=]{
        this->disconnectFromHost();
    });
}

/**
 * @brief 控制器初始化
 */
void CustomTcpSocket::controllerInit() {
    // 电流预警控制器初始化
    elecWarnController = new ElecStreamWarnController(this, this->devNumber, this->devName, this->speakRender);
    // 在线预警控制器
    onlineWarnController = new OnlineWarnController(this, this->devNumber, this->devName, this->speakRender);
    // 拉取数据预警控制器
    pullDataWarnController = new PullDataWarnController(this, this->devNumber, this->devName, this->speakRender);
}

CustomTcpSocket::~CustomTcpSocket()
{
    qDebug() << "detete socket";
}

void CustomTcpSocket::parseRegisterMsg(QString &asciiString, QString &data)
{
    // 若为注册记录设备id等信息
    if(asciiString.left(1).compare("*") == 0) {
        // 是注册消息, 更新dev_unit_online字段
        QString tmpDevNumber = asciiString.mid(1, 8);
        if (tmpDevNumber.compare(devNumber) == 0) {
            qDebug() << "设备已注册, 不再处理.";
            return;
        }
        // 判断此设备是否存在
        DevInfo devInfo = existDev(tmpDevNumber);
        if (devInfo.devNumber.compare("") == 0) {
            // 设备不存在
            return;
        }
        // 赋值通讯单元名称
        this->devName = devInfo.devName;
        devNumber = tmpDevNumber;
        // 初始化消息内容处理器
        this->speakRender = new DevSpeakRender(this, this->devName, this->peerPort(), this->clientIp);
        // 提取必要字符
        telephone = asciiString.replace(devNumber, "").replace("#", "").replace(",", "").replace("*", "");
        this->isRegisterMsg = true;
        emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), asciiString, "注册"));
        // 更新在线状态
//        Myapp::updateDevUnitOnline(this->devNumber, Myapp::ONLINE_STATUS[1]);
        // 发送心跳通知. 此处注册成功
        emit SignalRelayer::instance()->socketPublish(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[2].toInt(), "设备注册成功, 心跳检测开启."));
        // 控制器初始化
        controllerInit();
        // 初始化拉取定时任务
        initPullTimer();
        // 发送重置用于在线检测
        emit SignalRelayer::instance()->flushOfflineDetectManager();
        return;
    }
    // 非注册不处理
    if (!isRegister()) {
        return;
    }
    if (asciiString.compare("&&&&") == 0) {
        // 触发心跳
        onlineWarnController->detectedBeat();
        this->isHeartMsg = true;
        // 接收消息打印
        emit SignalRelayer::instance()->consoleMsg((this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), data, "收到心跳包")));
        return;
    } else {
        this->isHeartMsg = false;
        this->isRegisterMsg = false;
        emit SignalRelayer::instance()->consoleMsg((this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), data, "接收消息")));
    }

}

void CustomTcpSocket::durationMsg(QString msg, int commDirt) {
    if (!isRegister()) {
        return;
    }
    emit SignalRelayer::instance()->transferMsgDuration(RegisterLogDto(this->devNumber, this->clientIp, this->peerPort(), msg, commDirt));
}

CustomTcpSocket::DevInfo CustomTcpSocket::existDev(const QString &tmpDevNumber) {
    QString sql = QString("SELECT u.name, d.numb "
                  " from dev_manage_ref c"
                  " inner join dev_manage_ref e on e.pid = c.id"
                  " inner join dev_manage_ref w on w.pid = e.id"
                  " inner join dev_manage_ref u on u.pid = w.id"
                  " inner join dev_unit d on d.id = u.id where d.numb = '%1' and d.enable = 1").arg(tmpDevNumber);
    QSqlQuery query;
    if (!query.exec(sql) || !query.next() || query.value(1).isNull()) {
        return DevInfo();
    }
    return DevInfo(query.value(0).toString(), query.value(1).toString());
}

void CustomTcpSocket::handleReadyRead()
{
    // 信号灯闪烁
    emit SignalRelayer::instance()->msgInOut(true);
    // 接收到消息
    QString msg = recieveInfo();
    // 解析收到的数据
    parseRegisterMsg(msg.split(",")[0], msg.split(",")[1]);
    // 判断注册的设备号是否可用
    if(!Myapp::devEnabled(devNumber)) {
        return;
    }
    // 判断是否是业务数据
    if(isNotBusinessInfo()) return;
    // 不是0103开头的数据不处理
    QString data = msg.split(",")[1];
    if (data.indexOf("0103") == -1) {
        qDebug() << "不是0103开头数据不做处理, data: " << data;
        return;
    }
    // 电路数据处理
    handleReadReadImpl(data);
}

QString CustomTcpSocket::recieveInfo()
{
    QByteArray byteArray = this->readAll();
    QString reciveInfoMsg = byteArray.toHex();
    qDebug() << "socket.devNumber: " << this->devNumber << ", devName: " << this->devName << ", message: " << reciveInfoMsg;
    // 持久化记录消息, 记录原始记录
    durationMsg(reciveInfoMsg, Myapp::COMM_DIRECT[1].toInt());
    byteArray.clear();
    byteArray = QByteArray::fromHex(reciveInfoMsg.toUtf8());
    asciiString = QString::fromUtf8(byteArray);
    byteArray.clear();
    return QString("%1,%2").arg(asciiString).arg(reciveInfoMsg);

}

bool CustomTcpSocket::isNotBusinessInfo()
{
    // 若是注册, 返回不是业务数据
    if (isRegisterMsg) {
        return true;
    }
    // 若是心跳, 更新在线状态
    if (isHeartMsg) {
        Myapp::updateDevUnitOnline(devNumber, Myapp::ONLINE_STATUS[1]);
        return true;
    }
    // 若拉取异常, 不处理业务数据, 接收的数据会异常
//    if (pullDataWarnController->pullDataEchoErrNotifyFlag) {
//        return true;
//    }
    return false;
}

void CustomTcpSocket::handleReadReadImpl(QString &asciiString)
{
    // 转换十进制
    QString midstr = asciiString.mid(4,2);
    // 获取返回数据长度
    QByteArray arylen = QByteArray::fromHex(midstr.toLocal8Bit());
    // 获取三路电流数值, 得到10进制
    QString lenResult = hexToInt(arylen);
    // 判断是否异常更新设备
    // 获取 总功率值(在第3块(4个为一块)16进制截取4位)
    int ab = getIntResultByIndex(asciiString, 4);
    int bc = getIntResultByIndex(asciiString, 5);
    int ca = getIntResultByIndex(asciiString, 6);
    QString recordType;
    // 处理电流
    elecWarnController->handleData(ab, bc, ca);
    // 拉取正常进行处理
    pullDataWarnController->handleNormal();
}

void CustomTcpSocket::initPullTimer() {
    pullDataTimer = new QTimer(this);
    connect(pullDataTimer, &QTimer::timeout, this, &CustomTcpSocket::handleWrite);
    pullDataTimer->start(Myapp::secToMillSec(Myapp::getValueByKey(Myapp::SERV_INTERVAL_TIME_KEY)));
}

void CustomTcpSocket::handleWrite()
{
    // 设备ID为空则不拉取数据
    if (!isRegister()) {
        return;
    }
    // 判断前一次数据是否拉取成功.
    this->pullDataWarnController->echoDetect();
    QByteArray data = QByteArray::fromHex(sendMsg.toLocal8Bit());
    emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[0].toInt(), this->sendMsg));
    // 持久化发送消息
    durationMsg(sendMsg, Myapp::COMM_DIRECT[0].toInt());
    // 向服务端发送
    this->write(data);
}
// ---------- 注册与注销

bool CustomTcpSocket::isRegister() {
    return !devNumber.isNull() && !devNumber.isEmpty();
}

/**
 * @brief 注销socket, 避免在断网瞬间还在接收消息, 但内容已经不正确, 读取不到数据了
 */
void CustomTcpSocket::logOut() {
    this->devNumber = "";
}

// ---------- 采集频率

void CustomTcpSocket::handleChangeIntervalTime(int intervalTime)
{
    this->pullDataTimer->setInterval(Myapp::secToMillSec(intervalTime));
    emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleWrapperForDev(QString("采集频率重置为: %1 秒.").arg(intervalTime)));
}

// --------- 解析设备回应的电流等数据↓

int CustomTcpSocket::getIntResultByIndex(const QString &hexString, int index)
{
    QString wantStr = hexString.mid(3*2+((index-1)*4), 4);
//    qDebug() << "index: " << index << ", wantStr: " << wantStr;
    QByteArray arylen = QByteArray::fromHex(wantStr.toLocal8Bit());
    QString high = wantStr.left(2);
    QString low = wantStr.right(2);
    return 16*hexToInt(QByteArray::fromHex(high.toLocal8Bit())).toInt()+hexToInt(QByteArray::fromHex(low.toLocal8Bit())).toInt();
}

QString CustomTcpSocket::hexToInt(const QByteArray &byteAry)
{
   QString result;
   for (int i = 0; i < byteAry.size(); i++)
   {
       quint8 byte = byteAry[i];
       result += QString::number(byte, 10);
   }
   return result;
}