socket编程-UDP(3)-聊天室系统

发布于:2025-08-03 ⋅ 阅读:(17) ⋅ 点赞:(0)

socket编程-UDP(3)-聊天室系统

Mutex.hpp

#pragma once         // 防止头文件重复包含
#include <iostream>  // 标准输入输出(实际未使用,可移除)
#include <pthread.h> // POSIX线程库,提供互斥锁原语

namespace MutexModule
{
    class Mutex
    {
    public:
        Mutex()
        {
            pthread_mutex_init(&_mutex, nullptr); // 初始化互斥锁,nullptr表示默认属性
        }
        void Lock()
        {
            int n = pthread_mutex_lock(&_mutex); // 阻塞直到获取锁
            (void)n;                             // 忽略返回值(实际工程中应检查错误)
        }
        void Unlock()
        {
            int n = pthread_mutex_unlock(&_mutex); // 释放锁
            (void)n;
        }
        ~Mutex()
        {
            pthread_mutex_destroy(&_mutex);
        }
         pthread_mutex_t *Get()
        {
            return &_mutex;
        }
    private:
        pthread_mutex_t _mutex;
    };
    class LockGuard
    {
        public:
        LockGuard(Mutex &mutex):_mutex(mutex)
        {
            _mutex.Lock();// 构造时自动加锁
        }
        ~LockGuard()
        {
            _mutex.Unlock();
        }
        private:
            Mutex &_mutex;// 引用形式的Mutex对象(避免拷贝问题)
    };

多线程:

Cond.hpp

#pragma once

#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"

using namespace MutexModule;

namespace CondModule
{
    class Cond
    {
    public:
        Cond()
        {
            pthread_cond_init(&_cond, nullptr);
        }
        void Wait(Mutex &mutex)//线程等待条件
        {
            int n = pthread_cond_wait(&_cond, mutex.Get());// 原子操作:解锁mutex并等待
            (void)n;
        }
        void Signal()//唤醒单个线程
        {
            // 唤醒在条件变量下等待的一个线程
            int n = pthread_cond_signal(&_cond);// 唤醒至少一个等待线程
            (void)n;
        }
        void Broadcast()//唤醒所有线程
        {
            // 唤醒所有在条件变量下等待的线程
            int n = pthread_cond_broadcast(&_cond);// 唤醒所有等待线程
            (void)n;
        }
        ~Cond()
        {
            pthread_cond_destroy(&_cond);
        }
    private:
        pthread_cond_t _cond;// POSIX原生条件变量
    };
};

Log.hpp

#ifndef __LOG_HPP__
#define __LOG_HPP__

#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem> //C++17
#include <sstream>
#include <fstream>
#include <memory>
#include <ctime>
#include <unistd.h>
#include "Mutex.hpp"

namespace LogModule
{
    using namespace MutexModule;

    const std::string gsep = "\r\n";
    // 策略模式,C++多态特性
    // 2. 刷新策略 a: 显示器打印 b:向指定的文件写入
    //  刷新策略基类
    class LogStrategy
    {
    public:
        ~LogStrategy() = default;
        virtual void SyncLog(const std::string &message) = 0;
    };

    // 显示器打印日志的策略 : 子类
    class ConsoleLogStrategy : public LogStrategy
    {
    public:
        ConsoleLogStrategy()
        {
        }
        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            std::cout << message << gsep;
        }
        ~ConsoleLogStrategy()
        {
        }

    private:
        Mutex _mutex;
    };

    // 文件打印日志的策略 : 子类
    const std::string defaultpath = "./log";
    const std::string defaultfile = "my.log";
    class FileLogStrategy : public LogStrategy
    {
    public:
        FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile)
            : _path(path),
              _file(file)
        {
            LockGuard lockguard(_mutex);
            if (std::filesystem::exists(_path))
            {
                return;
            }
            try
            {
                std::filesystem::create_directories(_path);
            }
            catch (const std::filesystem::filesystem_error &e)
            {
                std::cerr << e.what() << '\n';
            }
        }
        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);

            std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file; // "./log/" + "my.log"
            std::ofstream out(filename, std::ios::app);                              // 追加写入的 方式打开
            if (!out.is_open())
            {
                return;
            }
            out << message << gsep;
            out.close();
        }
        ~FileLogStrategy()
        {
        }

    private:
        std::string _path; // 日志文件所在路径
        std::string _file; // 日志文件本身
        Mutex _mutex;
    };

    // 形成一条完整的日志&&根据上面的策略,选择不同的刷新方式

    // 1. 形成日志等级
    enum class LogLevel
    {
        DEBUG,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };
    std::string Level2Str(LogLevel level)
    {
        switch (level)
        {
        case LogLevel::DEBUG:
            return "DEBUG";
        case LogLevel::INFO:
            return "INFO";
        case LogLevel::WARNING:
            return "WARNING";
        case LogLevel::ERROR:
            return "ERROR";
        case LogLevel::FATAL:
            return "FATAL";
        default:
            return "UNKNOWN";
        }
    }
    //时间戳生成
    std::string GetTimeStamp()
    {
        // 输出格式:2023-08-20 14:30:45
        time_t curr = time(nullptr);
        struct tm curr_tm;
        localtime_r(&curr, &curr_tm);
        char timebuffer[128];
        snprintf(timebuffer, sizeof(timebuffer),"%4d-%02d-%02d %02d:%02d:%02d",
            curr_tm.tm_year+1900,
            curr_tm.tm_mon+1,
            curr_tm.tm_mday,
            curr_tm.tm_hour,
            curr_tm.tm_min,
            curr_tm.tm_sec
        );
        return timebuffer;
    }

    // 1. 形成日志 && 2. 根据不同的策略,完成刷新
    class Logger
    {
    public:
        Logger()
        {
            EnableConsoleLogStrategy();
        }
        void EnableFileLogStrategy()
        {
            _fflush_strategy = std::make_unique<FileLogStrategy>();
        }
        void EnableConsoleLogStrategy()
        {
            _fflush_strategy = std::make_unique<ConsoleLogStrategy>();
        }

        // 表示的是未来的一条日志
        class LogMessage
        {
        public:
            LogMessage(LogLevel &level, std::string &src_name, int line_number, Logger &logger)
                : _curr_time(GetTimeStamp()),
                  _level(level),
                  _pid(getpid()),
                  _src_name(src_name),
                  _line_number(line_number),
                  _logger(logger)
            {
                // 日志的左边部分,合并起来
                std::stringstream ss;
                ss << "[" << _curr_time << "] "
                   << "[" << Level2Str(_level) << "] "
                   << "[" << _pid << "] "
                   << "[" << _src_name << "] "
                   << "[" << _line_number << "] "
                   << "- ";
                _loginfo = ss.str();
            }
            // LogMessage() << "hell world" << "XXXX" << 3.14 << 1234
            template <typename T>
            LogMessage &operator<<(const T &info)
            {
                // a = b = c =d;
                // 日志的右半部分,可变的
                std::stringstream ss;
                ss << info;
                _loginfo += ss.str();
                return *this;
            }

            ~LogMessage()
            {
                if (_logger._fflush_strategy)
                {
                    _logger._fflush_strategy->SyncLog(_loginfo);
                }
            }

        private:
            std::string _curr_time;
            LogLevel _level;
            pid_t _pid;
            std::string _src_name;
            int _line_number;
            std::string _loginfo; // 合并之后,一条完整的信息
            Logger &_logger;
        };

        // 这里故意写成返回临时对象
        LogMessage operator()(LogLevel level, std::string name, int line)
        {
            return LogMessage(level, name, line, *this);
        }
        ~Logger()
        {
        }

    private:
        std::unique_ptr<LogStrategy> _fflush_strategy;
    };

    // 全局日志对象
    Logger logger;

    // 使用宏,简化用户操作,获取文件名和行号
    #define LOG(level) logger(level, __FILE__, __LINE__)
    #define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()
    #define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()
}

#endif

InetAddr.hpp

#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>

class InetAddr
{
    public:
     InetAddr(struct sockaddr_in &addr) : _addr(addr)
    {
        _port = ntohs(_addr.sin_port);           // 从网络中拿到的!网络序列
        _ip = inet_ntoa(_addr.sin_addr); // 4字节网络风格的IP -> 点分十进制的字符串风格的IP
    }
    uint16_t Port() {return _port;}
    std::string Ip() {return _ip;}
    const struct sockaddr_in &NetAddr() { return _addr; }
     bool operator==(const InetAddr &addr)
    {
        return addr._ip == _ip && addr._port == _port;
    }
    std::string StringAddr()
    {
        return _ip + ":" + std::to_string(_port);
    }
    ~InetAddr()
    {}
    private:
        struct sockaddr_in _addr;//原始网络地址结构体
        std::string _ip;//字符串格式的IP地址
        uint16_t _port;//主机字节序的端口号
};

Route.hpp

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include "InetAddr.hpp"
#include "Log.hpp"

using namespace LogModule;
class Route
{
private:
    bool IsExist(InetAddr &peer)
    {
        for (auto &user : _online_user)
        {
            if (user == peer)
            {
                return true;
            }
        }
        return false;
    }
    void AddUser(InetAddr &peer)
    {
        LOG(LogLevel::INFO) << "新增一个在线用户: " << peer.StringAddr();
        _online_user.push_back(peer);
    }

    void DeleteUser(InetAddr &peer)
    {
        for (auto iter = _online_user.begin(); iter != _online_user.end(); iter++)
        {
            if (*iter == peer)
            {
                LOG(LogLevel::INFO) << "删除一个在线用户:" << peer.StringAddr() << "成功";
                _online_user.erase(iter);
                break;
            }
        }
    }

public:
    Route()
    {
    }
    void MessageRoute(int sockfd, const std::string &message, InetAddr &peer)
    {
        // 1. 新用户自动注册
        if (!IsExist(peer))
        {
            AddUser(peer);
        }
        // 2. 构造带地址前缀的消息
        std::string send_message = peer.StringAddr() + "# " + message; // 127.0.0.1:8080# 你好

        // 3. 广播给所有在线用户
        for (auto &user : _online_user)
        {
            sendto(sockfd, send_message.c_str(), send_message.size(), 0, (const struct sockaddr *)&(user.NetAddr()), sizeof(user.NetAddr()));
        }
        // 4. 处理退出指令
        // 这个用户一定已经在线了
        if (message == "QUIT")
        {
            LOG(LogLevel::INFO) << "删除一个在线用户: " << peer.StringAddr();
            DeleteUser(peer);
        }
    }
    ~Route()
    {
    }

private:
    // 首次给我发消息,等同于登录
    std::vector<InetAddr> _online_user; // 在线用户
};

Thread.hpp

#ifndef _THREAD_H_
#define _THREAD_H_
// ...(标准库和POSIX线程头文件)
#include <iostream>
#include <string>
#include <pthread.h>
#include <cstdio>
#include <cstring>
#include <functional>

namespace ThreadModlue
{
    static uint32_t number = 1; // 用于生成线程名称的计数器

    class Thread
    {
        using func_t = std::function<void()>; // 线程执行的任务(无参无返回)
    private:
        void EnableDetach()
        {
            _isdetach = true;
        }
        void EnableRunning()
        {
            _isrunning = true;
        }
        static void *Routine(void *args) // 属于类内的成员函数,默认包含this指针!
        {
            Thread *self = static_cast<Thread *>(args);// 将void*转为Thread对象指针
            self->EnableRunning();
            if (self->_isdetach)
                self->Detach(); // 若需分离,调用Detach
            pthread_setname_np(self->_tid, self->_name.c_str());// 设置线程名(Linux特有)
            self->_func(); // 执行用户任务

            return nullptr;
        }
        // bug
    public:
        Thread(func_t func)// 生成唯一线程名
            : _tid(0),
              _isdetach(false),
              _isrunning(false),
              res(nullptr),
              _func(func)
        {
            _name = "thread-" + std::to_string(number++);
        }
        void Detach()
        {
            if (_isdetach)
                return;
            if (_isrunning)
                pthread_detach(_tid);// 分离运行中的线程
            EnableDetach();
        }

        bool Start()
        {
            if (_isrunning)
                return false;// 避免重复启动
            int n = pthread_create(&_tid, nullptr, Routine, this);// 创建线程
            if (n != 0)
            {
                return false;
            }
            else
            {
                return true;
            }
        }
        bool Stop()
        {
            if (_isrunning)
            {
                int n = pthread_cancel(_tid);//强制终止线程
                if (n != 0)
                {
                    return false;
                }
                else
                {
                    _isrunning = false;
                    return true;
                }
            }
            return false;
        }
        void Join()
        {
            if (_isdetach)
            {
                return;// 分离线程不可Join
            }
            int n = pthread_join(_tid, &res);// 阻塞等待线程结束
            if (n != 0)
            {
            }
            else
            {
            }
        }
        pthread_t Id()
        {
            return _tid;
        }
        ~Thread()
        {
        }

    private:
        pthread_t _tid;//线程ID
        std::string _name;//线程名称
        bool _isdetach;//标记线程是否为分离状态
        bool _isrunning;//标记线程是否在运行
        void *res;//存储线程返回值
        func_t _func;//用户定义的任务函数
    };
}

#endif

ThreadPool.hpp

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"

// .hpp header only

namespace ThreadPoolModule
{
    using namespace ThreadModlue;
    using namespace LogModule;
    using namespace CondModule;
    using namespace MutexModule;

    static const int gnum = 5;
    template <typename T>
    class ThreadPool
    {
    private:
        void WakeUpAllThread()
        {
            LockGuard lockguard(_mutex);
            if (_sleepernum)
                _cond.Broadcast();
            LOG(LogLevel::INFO) << "唤醒所有的休眠线程";
        }

        void WakeUpOne()
        {
            _cond.Signal();
            LOG(LogLevel::INFO) << "唤醒一个休眠线程";
        }

        ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0)
        {
            for (int i = 0; i < num; i++)
            {
                _threads.emplace_back(
                    [this]()
                    {
                        HandlerTask();
                    });
            }
        }
        void Start()
        {
            if (_isrunning)
                return;
            _isrunning = true;
            for (auto &thread : _threads)
            {
                thread.Start();
                LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();
            }
        }

        ThreadPool(const ThreadPool<T> &) = delete;
        ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;

    public:
    //单例模式:全局唯一线程池实例
        static ThreadPool<T> *GetInstance()
        {
            if (inc == nullptr)// 双重检查锁定
            {
                LockGuard lockguard(_lock);

                LOG(LogLevel::DEBUG) << "获取单例....";
                if (inc == nullptr)
                {
                    LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";
                    inc = new ThreadPool<T>();// 懒加载初始化:首次调用时创建线程池。
                    inc->Start();
                }
            }

            return inc;
        }
        void Stop()
        {
            if (!_isrunning)
                return;
            _isrunning = false;

            // 唤醒所有的线层
            WakeUpAllThread();
        }
        void Join()
        {
            for (auto &thread : _threads)
            {
                thread.Join();
            }
        }
        //生产者-消费者:主线程投递任务(Enqueue),工作线程(HandlerTask)从队列取任务执行。
        void HandlerTask()
        {
            char name[128];
            pthread_getname_np(pthread_self(), name, sizeof(name));
             // 条件等待:队列空且线程池运行中
            while (true)
            {
                T t;
                {
                    LockGuard lockguard(_mutex);
                    // 1. a.队列为空 b. 线程池没有退出
                    while (_taskq.empty() && _isrunning)
                    {
                        _sleepernum++;
                        _cond.Wait(_mutex);// 自动释放锁并休眠
                        _sleepernum--;
                    }
                    // 退出条件:线程池关闭且队列空
                    // 2. 内部的线程被唤醒
                    if (!_isrunning && _taskq.empty())
                    {
                        LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";
                        break;
                    }

                    // 一定有任务
                    // 取任务
                    t = _taskq.front(); // 从q中获取任务,任务已经是线程私有的了!!!
                    _taskq.pop();
                }// 锁作用域结束,自动释放
                t(); // 处理任务,需/要在临界区内部处理吗?1 0
            }
        }
        bool Enqueue(const T &in)
        {
            if (_isrunning)
            {
                LockGuard lockguard(_mutex);
                _taskq.push(in);
                if (_threads.size() == _sleepernum)
                    WakeUpOne();// 有休眠线程才唤醒
                return true;
            }
            return false;// 线程池未运行
        }
        ~ThreadPool()
        {
        }

    private:
        std::vector<Thread> _threads;//工作线程集合
        int _num; // 线程池中,线程的个数
        std::queue<T> _taskq;//任务队列(存储可调用对象)
        Cond _cond;//控制线程休眠与唤醒
        Mutex _mutex;//保护任务队列和状态变量

        bool _isrunning;//线程池运行状态标志
        int _sleepernum;

        // bug??
        static ThreadPool<T> *inc; // 单例指针
        static Mutex _lock;//当前休眠的线程数(优化唤醒策略)
    };

    template <typename T>
    ThreadPool<T> *ThreadPool<T>::inc = nullptr;

    template <typename T>
    Mutex ThreadPool<T>::_lock;

}

UdpSever.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "InetAddr.hpp"

using namespace LogModule;

using func_t = std::function<void(int sockfd, const std::string&, InetAddr&)>;

const int defaultfd = -1;

class UdpServer
{
    public:
    UdpServer(uint16_t port, func_t func)
    : _sockfd(defaultfd),
    _port(port),
    _isrunning(false),
    _func(func)
    {}
     void Init()
        {
            // 1. 创建UDP套接字
            _sockfd=socket(AF_INET, SOCK_DGRAM, 0);
            if(_sockfd<0)
            {
                LOG(LogLevel::FATAL) << "socket error!";
                exit(1);
            }
            LOG(LogLevel::INFO) << "socket success, sockfd : " << _sockfd;
            // 2. 绑定地址(INADDR_ANY监听所有网卡)
            struct sockaddr_in local;
            bzero(&local, sizeof(local));
            local.sin_family = AF_INET;
            local.sin_port = htons(_port);       // 端口转网络字节序
            local.sin_addr.s_addr = INADDR_ANY;  // 监听所有IP
            int n=bind(_sockfd, (struct sockaddr*)&local, sizeof(local));
            if (n < 0)
            {
            LOG(LogLevel::FATAL) << "bind error";
            exit(2);
             }
        LOG(LogLevel::INFO) << "bind success, sockfd : " << _sockfd;
        }
        void Start() {
    while (_isrunning) {
        char buffer[1024];
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        
        // 接收消息
        ssize_t s = recvfrom(_sockfd, buffer, sizeof(buffer)-1, 0, 
                            (struct sockaddr*)&peer, &len);
        if (s > 0) {
            buffer[s] = 0;  // 添加字符串结束符
            InetAddr client(peer);  // 封装客户端地址
            _func(_sockfd, std::string(buffer), client);  // 调用回调函数
        }
    }
}
    ~UdpServer()
    {}
    private:
        int _sockfd;//sockfd:服务端套接字(用于回复消息)
        uint16_t _port;
        bool _isrunning;
        func_t _func; // 服务器的回调函数,用来进行对数据进行处理
};

UdpServer.cc

#include <iostream>
#include <memory>
#include "Route.hpp"
#include "UdpServer.hpp" // 网络通信的功能
#include "ThreadPool.hpp"

using namespace ThreadPoolModule;

using task_t = std::function<void()>;

int main(int argc, char *argv[]) {
    // 参数检查
    uint16_t port = std::stoi(argv[1]);  // 从命令行获取端口

    // 1. 初始化路由服务
    Route r;

    // 2. 获取线程池单例
    auto tp = ThreadPool<task_t>::GetInstance();

    // 3. 创建UDP服务器,绑定Lambda回调
    std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(port, 
        [&r, &tp](int sockfd, const std::string &message, InetAddr& peer) {
            // 将路由任务封装为函数对象,提交到线程池
            task_t t = std::bind(&Route::MessageRoute, &r, sockfd, message, peer);
            tp->Enqueue(t);
        }
    );

    usvr->Init();  // 初始化套接字
    usvr->Start(); // 启动主循环
}

UdpClient.cc

#include <iostream>
#include <string>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "Thread.hpp"

int sockfd = 0;                  // UDP套接字描述符
std::string server_ip;           // 服务器IP
uint16_t server_port = 0;        // 服务器端口
pthread_t id;                    // 接收线程ID(用于终止线程)
using namespace ThreadModlue;
void Recv() {
    while (true) {
        char buffer[1024];
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        // 阻塞接收数据
        int m = recvfrom(sockfd, buffer, sizeof(buffer)-1, 0, (struct sockaddr*)&peer, &len);
        if (m > 0) {
            buffer[m] = 0;  // 添加字符串结束符
            std::cerr << buffer << std::endl;  // 打印服务器响应
        }
    }
}

void Send() {
    // 配置服务器地址
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(server_port);       // 端口转网络字节序
    server.sin_addr.s_addr = inet_addr(server_ip.c_str());  // IP转网络字节序

    // 发送上线通知
    sendto(sockfd, "inline", 6, 0, (struct sockaddr*)&server, sizeof(server));

    while (true) {
        std::string input;
        std::cout << "Please Enter# ";
        std::getline(std::cin, input);  // 读取用户输入

        // 发送消息
        sendto(sockfd, input.c_str(), input.size(), 0, (struct sockaddr*)&server, sizeof(server));

        if (input == "QUIT") {
            pthread_cancel(id);  // 终止接收线程
            break;
        }
    }
}

int main(int argc, char *argv[]) {
    if (argc != 3)
    {
        std::cerr << "Usage: " << argv[0] << " server_ip server_port" << std::endl;
        return 1;
    }
    // 参数检查
    server_ip = argv[1];
    server_port = std::stoi(argv[2]);

    // 创建UDP套接字
    sockfd = socket(AF_INET, SOCK_DGRAM, 0);
     if (sockfd < 0)
    {
        std::cerr << "socket error" << std::endl;
        return 2;
    }

    // 启动收发线程
    Thread recver(Recv);
    Thread sender(Send);
    recver.Start();
    sender.Start();
    id = recver.Id();  // 保存接收线程ID

    // 等待线程结束
    recver.Join();
    sender.Join();
}

网站公告

今日签到

点亮在社区的每一天
去签到