1. 单个socket中接收消息的方法要使用局部变量而非全局,避免消息频发时产生脏数据
优化后的关键代码
recieveInfo() 方法通过返回内部处理后的
msg进行传递
if (data.indexOf("0103") == -1) {
这里增加了判断, 对数据(非注册和心跳)再进行一遍过滤
void CustomTcpSocket::handleReadyRead()
{
// 信号灯闪烁
emit SignalRelayer::instance()->msgInOut(true);
// 接收到消息
QString msg = recieveInfo();
// 解析收到的数据
parseRegisterMsg(msg.split(",")[0], msg.split(",")[1]);
// 判断注册的设备号是否可用
if(!Myapp::devEnabled(devNumber)) {
return;
}
// 判断是否是业务数据
if(isNotBusinessInfo()) return;
// 不是0103开头的数据不处理
QString data = msg.split(",")[1];
if (data.indexOf("0103") == -1) {
qDebug() << "不是0103开头数据不做处理, data: " << data;
return;
}
// 电路数据处理
handleReadReadImpl(data);
}
以下是socket自定义类源码.cpp
#include "customtcpsocket.h"
#include <iostream>
CustomTcpSocket::CustomTcpSocket(QObject *parent)
: QTcpSocket(parent)
{
sendMsg = QString("01 03 00 00 00 06 C5 C8");
// 监听获取设备数据
connect(this, &QIODevice::readyRead, this, &CustomTcpSocket::handleReadyRead);
// 监听采集频率变更
connect(SignalRelayer::instance(), &SignalRelayer::socketIntervalTimeChanged, this, &CustomTcpSocket::handleChangeIntervalTime);
connect(SignalRelayer::instance(), &SignalRelayer::socketDestroy, this, [=]{
this->disconnectFromHost();
});
}
/**
* @brief 控制器初始化
*/
void CustomTcpSocket::controllerInit() {
// 电流预警控制器初始化
elecWarnController = new ElecStreamWarnController(this, this->devNumber, this->devName, this->speakRender);
// 在线预警控制器
onlineWarnController = new OnlineWarnController(this, this->devNumber, this->devName, this->speakRender);
// 拉取数据预警控制器
pullDataWarnController = new PullDataWarnController(this, this->devNumber, this->devName, this->speakRender);
}
CustomTcpSocket::~CustomTcpSocket()
{
qDebug() << "detete socket";
}
void CustomTcpSocket::parseRegisterMsg(QString &asciiString, QString &data)
{
// 若为注册记录设备id等信息
if(asciiString.left(1).compare("*") == 0) {
// 是注册消息, 更新dev_unit_online字段
QString tmpDevNumber = asciiString.mid(1, 8);
if (tmpDevNumber.compare(devNumber) == 0) {
qDebug() << "设备已注册, 不再处理.";
return;
}
// 判断此设备是否存在
DevInfo devInfo = existDev(tmpDevNumber);
if (devInfo.devNumber.compare("") == 0) {
// 设备不存在
return;
}
// 赋值通讯单元名称
this->devName = devInfo.devName;
devNumber = tmpDevNumber;
// 初始化消息内容处理器
this->speakRender = new DevSpeakRender(this, this->devName, this->peerPort(), this->clientIp);
// 提取必要字符
telephone = asciiString.replace(devNumber, "").replace("#", "").replace(",", "").replace("*", "");
this->isRegisterMsg = true;
emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), asciiString, "注册"));
// 更新在线状态
// Myapp::updateDevUnitOnline(this->devNumber, Myapp::ONLINE_STATUS[1]);
// 发送心跳通知. 此处注册成功
emit SignalRelayer::instance()->socketPublish(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[2].toInt(), "设备注册成功, 心跳检测开启."));
// 控制器初始化
controllerInit();
// 初始化拉取定时任务
initPullTimer();
// 发送重置用于在线检测
emit SignalRelayer::instance()->flushOfflineDetectManager();
return;
}
// 非注册不处理
if (!isRegister()) {
return;
}
if (asciiString.compare("&&&&") == 0) {
// 触发心跳
onlineWarnController->detectedBeat();
this->isHeartMsg = true;
// 接收消息打印
emit SignalRelayer::instance()->consoleMsg((this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), data, "收到心跳包")));
return;
} else {
this->isHeartMsg = false;
this->isRegisterMsg = false;
emit SignalRelayer::instance()->consoleMsg((this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[1].toInt(), data, "接收消息")));
}
}
void CustomTcpSocket::durationMsg(QString msg, int commDirt) {
if (!isRegister()) {
return;
}
emit SignalRelayer::instance()->transferMsgDuration(RegisterLogDto(this->devNumber, this->clientIp, this->peerPort(), msg, commDirt));
}
CustomTcpSocket::DevInfo CustomTcpSocket::existDev(const QString &tmpDevNumber) {
QString sql = QString("SELECT u.name, d.numb "
" from dev_manage_ref c"
" inner join dev_manage_ref e on e.pid = c.id"
" inner join dev_manage_ref w on w.pid = e.id"
" inner join dev_manage_ref u on u.pid = w.id"
" inner join dev_unit d on d.id = u.id where d.numb = '%1' and d.enable = 1").arg(tmpDevNumber);
QSqlQuery query;
if (!query.exec(sql) || !query.next() || query.value(1).isNull()) {
return DevInfo();
}
return DevInfo(query.value(0).toString(), query.value(1).toString());
}
void CustomTcpSocket::handleReadyRead()
{
// 信号灯闪烁
emit SignalRelayer::instance()->msgInOut(true);
// 接收到消息
QString msg = recieveInfo();
// 解析收到的数据
parseRegisterMsg(msg.split(",")[0], msg.split(",")[1]);
// 判断注册的设备号是否可用
if(!Myapp::devEnabled(devNumber)) {
return;
}
// 判断是否是业务数据
if(isNotBusinessInfo()) return;
// 不是0103开头的数据不处理
QString data = msg.split(",")[1];
if (data.indexOf("0103") == -1) {
qDebug() << "不是0103开头数据不做处理, data: " << data;
return;
}
// 电路数据处理
handleReadReadImpl(data);
}
QString CustomTcpSocket::recieveInfo()
{
QByteArray byteArray = this->readAll();
QString reciveInfoMsg = byteArray.toHex();
qDebug() << "socket.devNumber: " << this->devNumber << ", devName: " << this->devName << ", message: " << reciveInfoMsg;
// 持久化记录消息, 记录原始记录
durationMsg(reciveInfoMsg, Myapp::COMM_DIRECT[1].toInt());
byteArray.clear();
byteArray = QByteArray::fromHex(reciveInfoMsg.toUtf8());
asciiString = QString::fromUtf8(byteArray);
byteArray.clear();
return QString("%1,%2").arg(asciiString).arg(reciveInfoMsg);
}
bool CustomTcpSocket::isNotBusinessInfo()
{
// 若是注册, 返回不是业务数据
if (isRegisterMsg) {
return true;
}
// 若是心跳, 更新在线状态
if (isHeartMsg) {
Myapp::updateDevUnitOnline(devNumber, Myapp::ONLINE_STATUS[1]);
return true;
}
// 若拉取异常, 不处理业务数据, 接收的数据会异常
// if (pullDataWarnController->pullDataEchoErrNotifyFlag) {
// return true;
// }
return false;
}
void CustomTcpSocket::handleReadReadImpl(QString &asciiString)
{
// 转换十进制
QString midstr = asciiString.mid(4,2);
// 获取返回数据长度
QByteArray arylen = QByteArray::fromHex(midstr.toLocal8Bit());
// 获取三路电流数值, 得到10进制
QString lenResult = hexToInt(arylen);
// 判断是否异常更新设备
// 获取 总功率值(在第3块(4个为一块)16进制截取4位)
int ab = getIntResultByIndex(asciiString, 4);
int bc = getIntResultByIndex(asciiString, 5);
int ca = getIntResultByIndex(asciiString, 6);
QString recordType;
// 处理电流
elecWarnController->handleData(ab, bc, ca);
// 拉取正常进行处理
pullDataWarnController->handleNormal();
}
void CustomTcpSocket::initPullTimer() {
pullDataTimer = new QTimer(this);
connect(pullDataTimer, &QTimer::timeout, this, &CustomTcpSocket::handleWrite);
pullDataTimer->start(Myapp::secToMillSec(Myapp::getValueByKey(Myapp::SERV_INTERVAL_TIME_KEY)));
}
void CustomTcpSocket::handleWrite()
{
// 设备ID为空则不拉取数据
if (!isRegister()) {
return;
}
// 判断前一次数据是否拉取成功.
this->pullDataWarnController->echoDetect();
QByteArray data = QByteArray::fromHex(sendMsg.toLocal8Bit());
emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleFullPrint(Myapp::COMM_DIRECT[0].toInt(), this->sendMsg));
// 持久化发送消息
durationMsg(sendMsg, Myapp::COMM_DIRECT[0].toInt());
// 向服务端发送
this->write(data);
}
// ---------- 注册与注销
bool CustomTcpSocket::isRegister() {
return !devNumber.isNull() && !devNumber.isEmpty();
}
/**
* @brief 注销socket, 避免在断网瞬间还在接收消息, 但内容已经不正确, 读取不到数据了
*/
void CustomTcpSocket::logOut() {
this->devNumber = "";
}
// ---------- 采集频率
void CustomTcpSocket::handleChangeIntervalTime(int intervalTime)
{
this->pullDataTimer->setInterval(Myapp::secToMillSec(intervalTime));
emit SignalRelayer::instance()->consoleMsg(this->speakRender->consoleWrapperForDev(QString("采集频率重置为: %1 秒.").arg(intervalTime)));
}
// --------- 解析设备回应的电流等数据↓
int CustomTcpSocket::getIntResultByIndex(const QString &hexString, int index)
{
QString wantStr = hexString.mid(3*2+((index-1)*4), 4);
// qDebug() << "index: " << index << ", wantStr: " << wantStr;
QByteArray arylen = QByteArray::fromHex(wantStr.toLocal8Bit());
QString high = wantStr.left(2);
QString low = wantStr.right(2);
return 16*hexToInt(QByteArray::fromHex(high.toLocal8Bit())).toInt()+hexToInt(QByteArray::fromHex(low.toLocal8Bit())).toInt();
}
QString CustomTcpSocket::hexToInt(const QByteArray &byteAry)
{
QString result;
for (int i = 0; i < byteAry.size(); i++)
{
quint8 byte = byteAry[i];
result += QString::number(byte, 10);
}
return result;
}