概述
一个基于Qt框架实现的UDP主从服务器系统,该系统具备自动主机选举、故障转移和状态同步等关键功能,适用于分布式能源管理系统中的设备通信与协调。
系统核心功能
1. 自动主机选举与故障转移
系统通过优先级机制实现自动主机选举,当主机掉线时,系统会自动选择优先级最高的从机作为新的主机,确保系统持续运行。
关键代码实现:
void MasterServer::voteHostServer()
{
// 如果本机不在线,不参与选举
if(mapDeviceInfo.value(mPriority).onLine == false) {
return;
}
// 如果当前只有一个主机,不需要重新选举
if(judgmentMasterCircumstance().size() == 1) {
return;
}
// 按优先级排序所有设备
QList<int> listPriority = mapDeviceInfo.keys();
std::sort(listPriority.begin(), listPriority.end());
// 重置所有设备的主机状态
for(int i = 0; i < listPriority.size(); i++) {
mapDeviceInfo[listPriority.at(i)].hostServer = false;
}
// 选举优先级最高的在线设备为主机
for(int i = 0; i < listPriority.size(); i++) {
if(mapDeviceInfo.value(listPriority.at(i)).onLine == true) {
mapDeviceInfo[listPriority.at(i)].hostServer = true;
qDebug() << QString("%1 成为主机").arg(mapDeviceInfo[listPriority.at(i)].addrIp);
// 如果选举的是本机,更新本机状态
if(listPriority.at(i) == mPriority) {
bHost = true;
}
break;
}
}
}
2. 设备掉线检测与通知
系统会定期检测设备状态,当发现设备掉线时,会通过UDP广播通知所有设备更新状态。
关键代码实现:
void MasterServer::slotTimeOut()
{
qint64 curTime = QDateTime::currentSecsSinceEpoch();
int priorityDel = -1;
// 检查所有设备状态
for (const auto& priority : mapDeviceInfo.keys()) {
// 计算上次收到消息的时间差
int deltaT = curTime - mapDeviceInfo.value(priority).lastReciTime;
QString localIP = mJsonDevInfo.value("ip").toString();
// 忽略本机
if(localIP != mapDeviceInfo.value(priority).addrIp) {
if(deltaT > TIME_OUT_SECOND) {
mapDeviceInfo[priority].dropTimes++;
}
// 超过最大丢包次数,标记为离线
if(mapDeviceInfo[priority].dropTimes > MAX_DROP_TIME) {
mapDeviceInfo[priority].onLine = false;
mapDeviceInfo[priority].hostServer = false;
qDebug() << __FUNCTION__ << mapDeviceInfo[priority].addrIp << "Timeout drop";
// 如果掉线的是主机,触发重新选举
if(mapDeviceInfo[priority].hostServer) {
voteHostServer();
}
}
// 超过删除时间,从列表中移除
if(mapDeviceInfo[priority].dropTimes > TIME_OUT_DELETE) {
priorityDel = priority;
}
}
}
// 移除长时间离线的设备
if(priorityDel > 0) {
qDebug() << __FUNCTION__ << "time out delete priority" << priorityDel;
mapDeviceInfo.remove(priorityDel);
}
// 更新本机主机状态
bHost = mapDeviceInfo.value(mPriority).hostServer;
// 广播设备状态信息
if(bHost) {
// 主机广播所有设备信息
for (const auto& priority : mapDeviceInfo.keys()) {
QJsonObject jsonObj;
// 填充设备信息...
QByteArray ba = simplify(jsonObj);
mSocket->writeDatagram(ba, QHostAddress(BROADCAST_ADDRESS), PORT);
}
} else {
// 从机仅广播本机信息
QByteArray ba = simplify(mJsonDevInfo);
mSocket->writeDatagram(ba, QHostAddress(BROADCAST_ADDRESS), PORT);
}
// 发出信号通知UI更新
emit signalDevInfo(bHost, mapDeviceInfo);
}
3. 设备状态同步机制
系统通过UDP广播实现设备间状态同步,确保所有设备都能获取最新的系统状态。
关键代码实现:
void MasterServer::analysisJsonShowInfo(const QByteArray& array, const QHostAddress& addr)
{
QJsonObject json = QJsonDocument::fromJson(array).object();
int priority = json.value("priority").toInt();
bool host = json.value("hostServer").toBool();
bool online = json.value("onLine").toBool();
// 更新设备信息
mapDeviceInfo[priority].addrIp = json.value("ip").toString();
mapDeviceInfo[priority].priority = priority;
mapDeviceInfo[priority].hostServer = host;
mapDeviceInfo[priority].onLine = online;
// 更新其他设备数据...
// 如果是本机消息,更新接收时间
#ifdef ARM_LINUX
if(addr.toString() == json.value("ip").toString())
#endif
{
mapDeviceInfo[priority].lastReciTime = QDateTime::currentSecsSinceEpoch();
mapDeviceInfo[priority].dropTimes = 0;
}
// 如果只有本机在线,自动成为主机
if(mapDeviceInfo.size() == 1 && priority == mPriority) {
mapDeviceInfo[priority].hostServer = true;
bHost = true;
} else {
// 否则进行主机选举
voteHostServer();
}
// 通知UI更新从机信息
emit signalSlaveDevInfoUi(mapDeviceInfo);
}
基于优先级的分布式主从架构,工作流程分为四个阶段:1)初始化阶段各设备确定自身优先级并监听网络;2)选举阶段通过优先级比较自动选出主机;3)运行阶段主机协调系统、广播全局状态,从机上报自身状态;4)容错阶段实时监测设备状态,主机故障时立即触发重新选举,从机掉线时自动更新拓扑。系统核心优势体现在三个方面:高可用性方面,采用无单点故障设计,秒级故障转移确保服务连续性;状态一致性方面,通过UDP广播和主机维护全局视图保证数据同步;灵活性方面,支持优先级动态配置和网络参数调优,适应不同规模场景。此外,高效的UDP通信机制和精简的数据封装保证了系统在资源受限环境下的稳定运行。