Linux利用多线程和线程同步实现一个简单的聊天服务器

发布于:2025-05-20 ⋅ 阅读:(20) ⋅ 点赞:(0)

1. 概述

本文实现一个基于TCP/IP的简单多人聊天室程序。它包含一个服务器端和一个客户端:服务器能够接收多个客户端的连接,并将任何一个客户端发来的消息广播给所有其他连接的客户端;客户端则可以连接到服务器,发送消息并接收来自其他人的消息。该Demo运用了网络编程(Socket API)、多线程(Pthreads)以及线程同步(互斥锁)技术,以实现并发处理和数据共享安全。


2. 核心技术

  • 网络编程(Sockets)

    • TCP/IP: 选择面向连接的TCP协议,保证数据传输的可靠性。
    • 服务器端流程:
      1. socket(): 创建套接字。
      2. memset()/struct sockaddr_in: 配置服务器地址和端口。
      3. bind(): 绑定套接字到指定地址和端口。
      4. listen(): 设置套接字为监听状态,等待连接。
      5. accept(): 接受客户端连接,为每个连接创建一个新的套接字。
    • 客户端流程:
      1. socket(): 创建套接字。
      2. memset()/struct sockaddr_in: 配置服务器地址和端口。
      3. connect(): 连接到服务器。
    • 数据传输: read()write() 用于双向通信。
  • 多线程 (Pthreads)

    • 服务器端:
      • 主线程负责 accept() 连接。
      • 每接受一个新客户端,使用 pthread_create() 创建一个新的处理线程 (handle_clnt)。
      • 使用 pthread_detach() 将子线程设置为分离状态,使其结束后资源能自动回收,主线程无需 join
    • 客户端:
      • 创建两个核心线程
        • send_msg 线程:负责获取用户键盘输入并将其发送到服务器。
        • recv_msg 线程:负责接收服务器广播的消息并显示在控制台。
      • 这种设计使得用户输入和消息接收可以并行进行,互不阻塞
  • 线程同步 (Mutex)

    • 场景: 服务器端多个 handle_clnt 线程会并发访问和修改共享资源(如客户端套接字数组 clnt_socks 和当前客户端计数 clnt_cnt)。
    • 机制: 使用互斥锁 (mutx) 保护这些临界区。
      • pthread_mutex_init(): 初始化互斥锁。
      • pthread_mutex_lock(): 在访问共享资源前加锁。
      • pthread_mutex_unlock(): 访问完毕后解锁。
    • 关键操作加锁:
      • 添加新客户端到 clnt_socks
      • clnt_socks 移除断开连接的客户端。
      • send_msg (服务器端广播函数) 遍历 clnt_socks 时。

3. 主要模块实现

A. 服务器端 (server)
  • main() 函数:
    • 参数解析 (端口号)。
    • 初始化互斥锁。
    • 完成socket的创建、绑定、监听。
    • 进入无限循环,通过 accept() 接收客户端连接。
    • 为每个连接创建 handle_clnt 线程并分离。
  • handle_clnt(void* arg) 函数:
    • 获取传递过来的客户端套接字。
    • 循环调用 read() 接收该客户端的消息。
    • read() 成功,则调用 send_msg() (服务器的) 广播此消息。
    • read() 返回0 (客户端关闭连接),则执行清理:加锁 -> 从 clnt_socks 移除 -> clnt_cnt-- -> 解锁 -> close() 该客户端套接字。
  • send_msg(char* msg, int len) 函数 (服务器端):
    • 加锁。
    • 遍历 clnt_socks 数组,将消息 write() 给每一个已连接的客户端。
    • 解锁。
B. 客户端 (client)
  • main() 函数:
    • 参数解析 (服务器IP, 端口号, 用户名)。
    • 创建socket并 connect() 到服务器。
    • 创建 send_msgrecv_msg 两个线程。
    • pthread_join() 等待这两个线程结束(虽然当前 send_msg 中的 exit(0) 会提前终止)。
  • send_msg(void* arg) 函数:
    • 循环获取用户标准输入 (fgets)。
    • 检测到 "q" 或 "Q" 时,close(sock)exit(0) (可改进点)。
    • 将用户名和消息格式化后通过 write() 发送给服务器。
  • recv_msg(void* arg) 函数:
    • 循环调用 read() 从服务器接收消息。
    • 将接收到的消息 fputs() 到标准输出。

4. 总结

  • 互斥锁的必要性: 在多线程环境下,若不使用同步机制保护共享数据,会导致数据竞争和不可预期的结果。clnt_socksclnt_cnt 的并发修改是典型场景。
  • 线程分离 vs. 等待: 服务器端 pthread_detach 的使用简化了主线程的管理,适用于这种“即发即忘”的独立工作单元。客户端 pthread_join 的意图是等待线程完成,但需配合更优雅的线程退出信号。
  • 阻塞I/O与多线程: 每个客户端一个线程,每个线程中的 read() 是阻塞的。这简化了单个线程的逻辑,但当连接数非常大时,线程资源开销会成为瓶颈。
  • 客户端非阻塞体验: 通过发送和接收分离到不同线程,客户端用户体验得到了提升,不会因为等待网络消息而卡住输入。
  • 基本通信协议: 客户端在发送消息前简单地将用户名预置到消息体中,服务器直接转发这个消息体。这是一个非常初级的“协议”。

 具体代码如下:

 服务端代码:网络编程 + 多线程 + 线程同步

// 网络编程+多线程+线程同步实现的聊天服务器和客户端

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>

#define BUF_SIZE 100  // 定义缓冲区大小
#define MAX_CLNT 256  // 最大客户端数量

// 函数声明
void * handle_clnt(void * arg);  // 处理客户端连接的线程函数
void send_msg(char * msg, int len);  // 向所有客户端发送消息
void error_handling(char * msg);  // 错误处理函数

int clnt_cnt = 0;  // 当前客户端连接数量
int clnt_socks[MAX_CLNT];  // 存储所有客户端的socket描述符
pthread_mutex_t mutx;  // 互斥锁,用于同步对共享资源的访问(客户端数组)

int main(int argc, char *argv[])
{
    int serv_sock, clnt_sock;  // 服务端socket和客户端socket
    struct sockaddr_in serv_adr, clnt_adr;  // 服务端和客户端地址
    int clnt_adr_sz;  // 客户端地址结构的大小
    pthread_t t_id;  // 线程ID

    if(argc != 2) {
        printf("Usage : %s <port>\n", argv[0]);  // 检查输入的端口号参数
        exit(1);
    }
    
    pthread_mutex_init(&mutx, NULL);  // 初始化互斥锁
    
    serv_sock = socket(PF_INET, SOCK_STREAM, 0);  // 创建服务端socket
    if(serv_sock == -1) {
        error_handling("socket() error");
    }
    
    memset(&serv_adr, 0, sizeof(serv_adr));  // 初始化服务端地址结构
    serv_adr.sin_family = AF_INET;
    serv_adr.sin_addr.s_addr = htonl(INADDR_ANY);  // 绑定到所有可用接口
    serv_adr.sin_port = htons(atoi(argv[1]));  // 使用命令行提供的端口号
    
    if(bind(serv_sock, (struct sockaddr*)&serv_adr, sizeof(serv_adr)) == -1)  // 绑定服务端socket
        error_handling("bind() error");
    if(listen(serv_sock, 5) == -1)  // 开始监听
        error_handling("listen() error");
    
    while(1)
    {
        clnt_adr_sz = sizeof(clnt_adr);  // 获取客户端地址大小
        clnt_sock = accept(serv_sock, (struct sockaddr*)&clnt_adr, &clnt_adr_sz);  // 接受客户端连接
    
        // 添加新的客户端socket到数组
        pthread_mutex_lock(&mutx);  // 获取互斥锁,确保线程安全
        clnt_socks[clnt_cnt++] = clnt_sock;  // 增加客户端到客户端数组
        pthread_mutex_unlock(&mutx);  // 释放互斥锁
    
        // 创建新线程来处理客户端
        pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);
        pthread_detach(t_id);  // 将线程分离,避免主线程等待
        printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr));  // 输出客户端IP地址
    }
    
    close(serv_sock);  // 关闭服务端socket
    return 0;

}

// 处理客户端的函数
void * handle_clnt(void * arg)
{
    int clnt_sock = *((int*)arg);  // 获取客户端socket
    int str_len = 0, i;
    char msg[BUF_SIZE];  // 缓冲区

    while((str_len = read(clnt_sock, msg, sizeof(msg))) != 0)  // 读取客户端发送的消息
        send_msg(msg, str_len);  // 将消息转发给所有客户端
    
    // 客户端断开连接后,移除客户端
    pthread_mutex_lock(&mutx);  // 获取互斥锁
    for(i = 0; i < clnt_cnt; i++)  // 查找并移除断开的客户端
    {
        if(clnt_sock == clnt_socks[i])
        {
            while(i++ < clnt_cnt - 1)  // 将后续客户端前移
                clnt_socks[i] = clnt_socks[i + 1];
            break;
        }
    }
    clnt_cnt--;  // 客户端数量减一
    pthread_mutex_unlock(&mutx);  // 释放互斥锁
    
    close(clnt_sock);  // 关闭客户端socket
    return NULL;

}

// 向所有客户端发送消息
void send_msg(char * msg, int len)
{
    int i;
    pthread_mutex_lock(&mutx);  // 获取互斥锁,保护共享资源(客户端socket数组)
    for(i = 0; i < clnt_cnt; i++)  // 向所有连接的客户端发送消息
        write(clnt_socks[i], msg, len);
    pthread_mutex_unlock(&mutx);  // 释放互斥锁
}

// 错误处理函数
void error_handling(char * msg)
{
    fputs(msg, stderr);  // 输出错误信息
    fputc('\n', stderr);
    exit(1);  // 退出程序
}

客户端代码:网络编程 + 多线程

// 客户端程序:网络编程+多线程实现的聊天客户端

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> 
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>

#define BUF_SIZE 100  // 定义消息的最大长度
#define NAME_SIZE 20  // 定义用户名的最大长度

// 函数声明
void * send_msg(void * arg);  // 发送消息的线程函数
void * recv_msg(void * arg);  // 接收消息的线程函数
void error_handling(char * msg);  // 错误处理函数

// 用户名和消息缓冲区
char name[NAME_SIZE] = "[DEFAULT]";  // 默认用户名
char msg[BUF_SIZE];  // 用于存储用户输入的消息

int main(int argc, char *argv[])
{
    int sock;
    struct sockaddr_in serv_addr;  // 服务器地址结构
    pthread_t snd_thread, rcv_thread;  // 发送和接收消息的线程
    void * thread_return;

    // 检查命令行参数,确保提供了 IP、端口和用户名
    if(argc != 4) {
        printf("Usage : %s <IP> <port> <name>\n", argv[0]);
        exit(1);
    }
    
    // 设置客户端用户名
    sprintf(name, "[%s]", argv[3]);
    
    // 创建客户端socket
    sock = socket(PF_INET, SOCK_STREAM, 0);
    
    // 初始化服务器地址结构
    memset(&serv_addr, 0, sizeof(serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = inet_addr(argv[1]);  // 获取服务器的IP地址
    serv_addr.sin_port = htons(atoi(argv[2]));  // 获取服务器的端口号
    
    // 连接到服务器
    if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1)
        error_handling("connect() error");
    
    // 创建发送和接收消息的线程
    pthread_create(&snd_thread, NULL, send_msg, (void*)&sock);
    pthread_create(&rcv_thread, NULL, recv_msg, (void*)&sock);
    
    // 等待两个线程结束
    pthread_join(snd_thread, &thread_return);
    pthread_join(rcv_thread, &thread_return);
    
    close(sock);  // 关闭客户端socket
    return 0;

}

// 发送消息的线程函数
void * send_msg(void * arg)
{
    int sock = *((int*)arg);  // 获取客户端socket
    char name_msg[NAME_SIZE + BUF_SIZE];  // 用于存储带有用户名的消息

    while(1) 
    {
        fgets(msg, BUF_SIZE, stdin);  // 获取用户输入的消息
    
        // 如果输入为 "q" 或 "Q",则退出程序
        if(!strcmp(msg, "q\n") || !strcmp(msg, "Q\n")) 
        {
            close(sock);  // 关闭socket连接
            exit(0);  // 退出程序
        }
    
        // 将用户名和消息合并成一个字符串
        sprintf(name_msg, "%s %s", name, msg);
    
        // 发送合并后的消息到服务器
        write(sock, name_msg, strlen(name_msg));
    }
    return NULL;  // 返回空值

}

// 接收消息的线程函数
void * recv_msg(void * arg)
{
    int sock = *((int*)arg);  // 获取客户端socket
    char name_msg[NAME_SIZE + BUF_SIZE];  // 用于存储带有用户名的消息
    int str_len;

    while(1)
    {
        // 从服务器读取消息
        str_len = read(sock, name_msg, NAME_SIZE + BUF_SIZE - 1);
        
        if(str_len == -1)  // 如果读取失败,返回错误
            return (void*)-1;
    
        name_msg[str_len] = 0;  // 将读取的字符串以 null 结尾
        fputs(name_msg, stdout);  // 输出服务器发来的消息
    }
    return NULL;  // 返回空值

}

// 错误处理函数
void error_handling(char *msg)
{
    fputs(msg, stderr);  // 将错误消息输出到标准错误
    fputc('\n', stderr);  // 输出换行符
    exit(1);  // 退出程序
}


网站公告

今日签到

点亮在社区的每一天
去签到