Linux/Unix线程及其同步(create、wait、exit、互斥锁、条件变量、多线程)

发布于:2025-07-05 ⋅ 阅读:(16) ⋅ 点赞:(0)

线程

I 线程基本概念

线程是允许应用程序并发执行多个任务的一种机制;一个进程可以包含多个线程,统一程序中的线程会独立执行相同的程序,且共享同一份全局内存区域。

1、为什么引入线程
  • 进程间不共享内存,进程间的信息交换需要通过IPC(进程间通信)机制来实现;同进程的线程之间共享内存空间(包括堆、全局变量等),可直接通过读写共享内存来实现线程间高效通信

  • fork创建进程代价较高,涉及完整的地址空间复制、文件描述符表复制等资源开销;线程创建仅需少量寄存器设置和栈空间分配,共享进程资源

  • 线程切换开销远小于进程切换,因无需切换地址空间和刷新TLB

  • 多线程程序能更好利用多核CPU资源,实现真正的并行计算

  • 线程间通信延迟更低,适合需要频繁数据交互的场景
2、Pthreads

Posix统一了Pthreads线程接口的标准,提供了一套跨平台的线程创建、同步和管理API。

Pthread常用数据类型:

在这里插入图片描述

II 线程基本操作

1、创建线程

程序启动运行时只有一条主线程(main函数),可以使用pthread_creat()函数来创建新的子线程:

    #include <pthreads.h>
    int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                        void *(*start)(void *), void *arg);
    /*return 0 on success,or a positive errno on error*/
  • 该函数通过调用带有参数arg的函数start开始执行,新建的线程执行start函数里面的内容,调用pthread_create()的线程会继续执行后面的内容

  • 参数argvoid *类型,可以将指向任意对象的指针传递给start函数,需要传递多个参数时,可以将arg指向一个结构体

  • 参数thread指向pthread_t类型的缓冲区,在pthread_creat()返回之前,会在此保存该线程的唯一标识(ID),后续的pthread函数可以通过该表示来引用此线程

  • 参数attr指定了新创建线程的各种属性,设置为NULL,那么创建的线程使用各种默认属性
2、终止线程

终止线程有很多方式:

  • 线程函数start执行return语句并返回值之后线程终止

  • 调用pthread_exit()pthread_cancle()函数取消线程

  • 任意线程调用了exit(),或者主线程执行了return语句

pthread_exit终止线程:

    #include <pthread.h>
    void pthread_exit(void *retval);
  • 可以在线程的任何地方调用pthread_exit()来退出线程,与return功能相似

  • 参数retval中保存了线程的返回值,其所指向的内容不应该分配在线程栈中

  • 主线程调用pthread_exit()后,其他线程还会继续执行
3、线程ID

进程内部的每个线程都有唯一的标识,称为线程ID。在Linux中:

  • 线程ID在其所属进程内唯一标识一个线程

  • 不同进程中的线程可能具有相同的线程ID(由不同进程的线程ID命名空间隔离)

  • 系统范围内唯一的线程标识可通过pthread_self()结合进程ID实现

获取自己线程的ID:

    #include <pthread.h>
    pthread_t pthread_self(void);

检查两个线程的ID是否相同:

    #include <pthread.h>
    int pthread_equal(pthread_t t1, pthread_t t2);
    /*return nonezero value if t1 equal to t2, otherwise 0*/
4、连接已终止线程
  • 函数pthread_join()等待由thread标识的线程终止,如果线程已经终止则会立即返回
    #include <pthread.h>
    int pthread_join(pthread_t thread, void **retval);
    /*return 0 on success or a positive error number on error*/
  • 参数thread指定了需要连接的线程

  • retval为一非空指针,用于保存线程终止时返回值的拷贝(即线程return值或调用pthread_exit()所指定的值)

  • 该函数的功能与进程的waitpid()类似,但线程之间的关系是对等的,进程中的任意线程均可以通过该函数与进程中的任意线程建立连接
5、线程基本操作示例
    /* Thread handler function for pthread_create */
    void* handle_create(void *arg){
        pthread_t tid = pthread_self();
        printf("I am thread %ld\n",(long int)tid);

        /* Check if argument is NULL */
        char *str = (char*)arg;
        if(str == NULL){
            printf("nothing recived form str\n");
            pthread_exit((void*)-1);
        }

        /* Print received message */
        printf("message form pthread_create:\n%s\n",str);
        return (void*)0;
    }

    /* Main function to demonstrate thread creation and joining */
    int thread_pthread_func(int argc, char *argv[]){
        /* Check command line arguments */
        if(argc < 2){
            printf("Usage:%s <message to thread>\n", argv[0]);
            return -1;
        }

        /* Handle NULL argument case */
        char *str = (strcmp(argv[1],"NULL") == 0) ? NULL : argv[1]; 

        /* Create new thread */
        pthread_t pth;
        int err = pthread_create(&pth, NULL, handle_create, str);
        if(err != 0){
            printf("pthread_create error:%s\n", strerror(err));
            return -1;
        }

        /* Wait for thread to complete and get return value */
        void *ret = NULL ;
        int join = pthread_join(pth, &ret);
        if(join != 0){
            printf("pthread_join error: %s\n", strerror(join));
            return -1;
        }
        printf("thread %ld exit status: %ld\n", (long int)pth, (intptr_t)ret);

        return 0;
    }

III 通过互斥量同步线程

1、基本概念
  • 若多个线程共享同一资源(文件、变量、内存块等),当线程1需要读取这一资源的时候,正好线程2修改了这一资源的值,这时线程1读取到的资源值已被修改,可能不是预期的值,这可能会带来无法预期的结果

  • 当多个线程共享相同的资源时,需要确保它们访问这些资源时不会产生冲突或不一致的结果。线程同步就是协调多个线程的执行顺序,以确保数据的一致性和正确性。

  • 临界区是指访问同一共享资源的代码片段,并且这段代码的执行应为原子操作
2、互斥量(Mutex)
  • 为避免线程更新共享变量时出现问题,可通过互斥量来确保同一时刻仅有一个线程可以访问这个共享变量

  • 一个互斥量有两种状态,锁定状态和解锁状态,任何时候只有一个线程可以锁定同一互斥量,若有线程试图锁定已锁定的互斥量,该线程将阻塞,直到该互斥量变为解锁状态

  • 一旦线程锁定了某个互斥量,这个线程将成为该互斥量的所有者,只有所有者才能给锁定状态的互斥量解锁

  • 线程通过访问互斥量保护的共享资源时,遵循以下流程:

    针对共享资源锁定互斥量 --> 访问共享资源 --> 解锁互斥量

3、静态分配互斥量
  • 互斥量是pthread_mutex_t类型的变量,使用前必须对其进行初始化,初始化后的互斥量处于解锁状态
    pthrea_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
4、互斥量锁定与解锁
  • 可通过以下函数对互斥量进行锁定和解锁,参数mutex是需要解锁或锁定的互斥量
    #include <pthread.h>
    int pthread_mutex_lock(pthread_mutex_t *mutex);
    int pthread_mutex_unlock(pthread_mutex_t *mutex);
    /*return 0 on success or a positive error number on error*/
  • 通过pthread_mutex_lock()锁定互斥量时,若互斥量处于未锁定状态,则函数锁定该互斥量并立即返回;若互斥量处于锁定状态,则一直阻塞,当互斥量解锁时锁定互斥量并返回

  • 通过pthread_mutex_unlock()解锁互斥量时,不能对处于未锁定状态的互斥量进行解锁,不能解锁其他线程锁定的互斥量

  • 示例:多线程修改互斥量保护的全局变量
    /* Shared data protected by mutex */
    static long int data_mutex;
    /* Mutex for protecting data_mutex */
    static pthread_mutex_t mtx_lock_unlock = PTHREAD_MUTEX_INITIALIZER;

    /* Thread function that increments shared data with mutex protection */
    void *handler_mutex_lock_unlock(void *num){
        /* Convert argument to loop count */
        int loop = atoi((char*)num);
        int ret, tmp;

        /* Loop to increment shared data */
        for(int i = 0; i < loop; i++){
            /* Lock mutex before accessing shared data */
            ret = pthread_mutex_lock(&mtx_lock_unlock);
            if(ret != 0){
                printf("pthread_mutex_lock error :%s\n",strerror(ret));
                return (void*)-1;
            }

            /* Critical section: increment shared data */
            tmp = data_mutex;
            tmp++;
            data_mutex = tmp;

            /* Unlock mutex after accessing shared data */
            ret = pthread_mutex_unlock(&mtx_lock_unlock);
            if(ret != 0){
                printf("pthread_mutex_unlock error:%s\n", strerror(ret));
                return (void*)-1;
            }
        }
        return (void*)0;
    }

    /* Function to create multiple threads that increment shared data */
    int main(int argc, char *argv[]){
        /* Check command line arguments */
        if(argc < 2 || atoi(argv[1]) < 0){
            printf("Usage:%s <loop times>", argv[0]);
            return -1;
        }

        /* Create 5 threads */
        pthread_t thr[5];
        int errn;
        for(int i = 0; i < 5; i++){
            errn = pthread_create(&thr[i], NULL, handler_mutex_lock_unlock, (void*)argv[1]);
            if(errn != 0){
                printf("pthread_create error:%s\n", strerror(errn));
                return -1;
            }
        }

        /* Wait for all threads to complete */
        void *ret = NULL;
        for(int j = 0; j < 5; j++){
            if(pthread_join(thr[j], &ret) != 0){
                printf("pthread_join error\n");
                return -1;
            }
            if((intptr_t)ret != 0)
                printf("thread %ld exit error\n", (long int)thr[j]);
        }

        /* Print final value of shared data */
        printf("the value of \"data_mutex\" is %ld \n", data_mutex);
        return 0;
    }
5、互斥量的死锁
  • 多个线程在争夺资源时,因互相等待对方释放资源而陷入无限阻塞的状态,例如:

  • 有两个互斥量A和B,两个线程1和2,线程1锁定了A之后欲锁定B,但线程2要在锁定A之后才能解锁B,线程2锁定了B之后欲锁定A,但线程1要在锁定B之后才能解锁A,这样就会陷入无限阻塞状态

  • 为避免死锁,可以采取以下策略:

  (1)固定加锁顺序:所有线程按照相同的顺序获取锁

  (2)使用尝试加锁:pthread_mutex_trylock()避免阻塞

  (3)设置超时机制:pthread_mutex_timedlock()限制等待时间

  (4)避免嵌套锁:尽量减少同时持有多个锁的情况

  (5)使用锁层次结构:为锁定义层次关系,只允许按层次获取

6、互斥量类型
  • POSIX标准定义了4种互斥量类型:

  (1)PTHREAD_MUTEX_NORMAL:标准互斥量,不检测死锁和重复加锁

  (2)PTHREAD_MUTEX_ERRORCHECK:错误检查互斥量,检测死锁和重复加锁并返回错误

  (3)PTHREAD_MUTEX_RECURSIVE:递归互斥量,允许同一线程多次加锁

  (4)PTHREAD_MUTEX_DEFAULT:默认类型,通常映射为NORMAL或ERRORCHECK

  • 通过pthread_mutexattr_settype()设置:
    #include <pthread.h>
    int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);
    /*return 0 on success or a positive error number on error*/
  • 示例:
    pthread_mutex_t mutex;
    pthread_mutexattr_t attr;

    pthread_mutexattr_init(&attr);
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    pthread_mutex_init(&mutex, &attr);
    pthread_mutexattr_destroy(&attr);
7、动态初始化互斥量
  • PTHREAD_MUTEX_INITIALIZER只能用于对静态分配且拥有默认属性的互斥量进行初始化

  • 对于在堆中或者栈中分配的互斥量必须进行动态初始化,使用后必须手动销毁
    #include <pthread.h>
    int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
    int pthread_mutex_destory(pthread_mutex_t *mutex);
    /*return 0 on success or a positive error number on error*/
  • 参数mutex是需要初始化或者销毁的互斥量,attr用于定义互斥量的属性,为NULL表示默认属性

  • 示例:动态初始化互斥锁
    /*mutex to protect shared data*/
    static pthread_mutex_t mtx_mutex_init;
    /* Shared data protected by mutex */
    static long int data_mutex_init = 0;

    void *handler_mutex_init(){
        // Lock mutex before accessing shared data
        int ret = pthread_mutex_lock(&mtx_mutex_init);
        if(ret != 0){
            printf("pthread_lock error :%s\n", strerror(ret));
            return (void*)-1;
        }

        // Critical section: increment shared data
        long int tmp = data_mutex_init;
        for(int i = 0; i < 100000; i++)
            tmp ++;
        data_mutex_init = tmp;

        // Unlock mutex after accessing shared data
        ret = pthread_mutex_unlock(&mtx_mutex_init);
        if(ret != 0){
            printf("pthread_unlock error :%s\n", strerror(ret));
            return (void*)-1;
        }
        return (void*)0;
    }

    int thread_mutex_init(){
        // Initialize mutex
        int ret = pthread_mutex_init(&mtx_mutex_init, NULL);
        if(ret != 0){
            printf("pthread_mutex_init error: %s\n", strerror(ret));
            return -1;
        }

        // Create first thread
        pthread_t pth_1, pth_2;
        ret = pthread_create(&pth_1, NULL, handler_mutex_init, NULL);
        if(ret != 0){
            printf("pthread_create error:%s\n", strerror(ret));
            return -1;
        }

        // Create second thread
        ret = pthread_create(&pth_2, NULL, handler_mutex_init, NULL);
        if(ret != 0){
            printf("pthread_create error:%s\n", strerror(ret));
            goto clean_up;
        }

        // Wait for threads to complete and check their status
        void *err;
        if(pthread_join(pth_1, &err) != 0){
            printf("pthread_join error\n");
            goto clean_up;
        }
        if((intptr_t)err != 0)
            printf("thread %ld exit error\n", (long int)err);

        if(pthread_join(pth_2, &err) != 0){
            printf("pthread_join error\n");
            goto clean_up;
        }
        if((intptr_t)err != 0)
            printf("thread %ld exit error\n", (long int)err);
        
        clean_up:
            // Clean up mutex resources
            if(pthread_mutex_destroy(&mtx_mutex_init) != 0){
                printf("pthread_mutes_destory error\n");
                return -1;
            }
            
        // Print final value of shared data
        printf("the value of data_mutex_init is %ld\n", data_mutex_init);
        return 0;
}

IV 通过条件变量同步线程

1、条件变量
  • 条件变量允许一个线程就某个共享资源的状态变化通知其他线程

  • 条件变量总是与互斥量配合使用,条件变量就共享变量的状态改变发出通知,互斥变量则提供对共享变量的访问保护,防止竞争条件

  • 主要操作:
    • pthread_cond_wait():等待条件变量
    • pthread_cond_signal():唤醒一个等待线程
    • pthread_cond_broadcast():唤醒所有等待线程
2、静态分配的条件变量
  • 静态初始化使用宏PTHREAD_COND_INITIALIZER
    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
3、初始化动态分配的条件变量
  • 对于栈或堆中的条件变量需要动态初始化,使用完成后需要销毁使用
    #include <pthread.h>
    int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
    int pthread_cond_destroy(pthread_cond_t *cond);
    /*return 0 on success or error number on failure*/
  • 参数attr指定了条件变量的属性,若为NULL则表示默认属性

  • 示例:
    pthread_cond_t cond;
    pthread_cond_init(&cond, NULL);
    /*...*/
    pthread_cond_destroy(&cond);
4、通知和等待条件变量
  • 条件变量的基本操作是发送信号和等待,发送信号即在共享变量状态改变时通知处于等待状态的线程,等待则是在收到共享变量状态变化信号前一直处于阻塞状态
    #include <pthread.h>
    int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
    int pthread_cond_signal(pthread_cond_t *cond);
    int pthread_cond_broadcast(pthread_cond_t *cond);
    /*return 0 on success or error number on failure*/
  • 参数cond是指向条件变量的指针,参数mutex是指向与条件变量配合使用的互斥量的指针

  • pthread_cond_signal()只会唤醒一条处于等待状态的线程,只有一条需要唤醒的线程时推荐使用

  • pthread_cond_broadcast()会唤醒所有处于等待状态的线程,有多条线程等待时推荐使用

  • pthread_cond_wait()的执行过程:

  (1)解锁互斥量

  (2)使调用线程进入等待状态(阻塞)

  (3)当被其他线程通过signal/broadcast唤醒时,重新获取互斥锁

  (4)只有成功获取互斥锁后,函数才会返回

5、示例:生产者-消费者模型
  • 多线程消费者-生产者模型,多位消费者与多位生产者,生产者生产一件商品后通知一个处于等待中的消费者,通过互斥量和条件变量实现线程同步
/* Mutex for producer-consumer synchronization */
pthread_mutex_t mtx_cp = PTHREAD_MUTEX_INITIALIZER;
/* Condition variable for producer-consumer synchronization */
pthread_cond_t con_cp = PTHREAD_COND_INITIALIZER;
/* Count of available products */
static int available = 0;
/* Count of consumed products */
static int consumed = 0;

/* Producer thread function that creates products */
void *handler_productor(){
    sleep(1);
    /* Lock mutex before accessing shared data */
    int err = pthread_mutex_lock(&mtx_cp);
    if(err != 0){
        printf("mutex lock error: %s\n", strerror(err));
        return (void*)-1;
    }

    /* Increment available products count */
    available++;
    printf("one product produced, we will inform the consumer\n");

    /* Signal consumer that product is available */
    err = pthread_cond_signal(&con_cp);
    if(err != 0){
        printf("cont signal error: %s\n", strerror(err));
        pthread_mutex_unlock(&mtx_cp);
        return (void*)-1;
    }

    /* Unlock mutex after accessing shared data */
    err = pthread_mutex_unlock(&mtx_cp);
    if(err != 0){
        printf("mutex unlock error: %s\n", strerror(err));
        return (void*)-1;
    }

    return (void*)0;
}

/* Consumer thread function that consumes products */
void *handler_consumer(){
    /* Lock mutex before checking shared data */
    int err = pthread_mutex_lock(&mtx_cp);
    if(err != 0){
        printf("mutex lock error:%s\n", strerror(err));
        return (void*)-1;
    }

    /* Wait while no products are available */
    while(available <= 0){
        err = pthread_cond_wait(&con_cp, &mtx_cp);
        if(err != 0){
            printf("cont wait error:%s\n", strerror(err));
            pthread_mutex_unlock(&mtx_cp);
            return (void*)-1;
        }
    }

    /* Consume product and update counters */
    available--;
    consumed++;
    printf("one product has benn consumed\n");

    /* Unlock mutex after accessing shared data */
    err = pthread_mutex_unlock(&mtx_cp);
    if(err != 0){
        printf("mutex unlock error:%s\n", strerror(err));
        return (void*)-1;
    }

    return (void*)0;
}

/* Main function for producer-consumer demonstration */
int thread_producter_consumer(int argc, char *argv[]){
    /* Check command line arguments */
    if(argc < 2 || atoi(argv[1]) <= 0){
        printf("Usage:%s <consumer number>\n", argv[0]);
        return -1;
    }

    int err, tmp = atoi(argv[1]);

    /* Create producer and consumer threads */
    pthread_t pth_p[tmp],pth_c[tmp];
    for(int i = 0; i < tmp; i++){
        err = pthread_create(&pth_p[i], NULL, handler_productor, NULL);
        if(err != 0){
            printf("thread created error:%s\n", strerror(err));
            return -1;
        }
        err = pthread_create(&pth_c[i], NULL, handler_consumer, NULL);
        if(err != 0){
            printf("thread created error:%s\n", strerror(err));
            return -1;
        }
    }

    /* Wait for all threads to complete */
    for(int i = 0; i < tmp; i++){
        pthread_join(pth_p[i], NULL);
        pthread_join(pth_c[i], NULL);
    }

    /* Print total consumed products */
    printf("the consumer has totally consumed %d producteds\n", consumed);
    return 0;
}

            printf("thread created error:%s\n", strerror(err));
            return -1;
        }
        err = pthread_create(&pth_c[i], NULL, handler_consumer, NULL);
        if(err != 0){
            printf("thread created error:%s\n", strerror(err));
            return -1;
        }
    }

    /* Wait for all threads to complete */
    for(int i = 0; i < tmp; i++){
        pthread_join(pth_p[i], NULL);
        pthread_join(pth_c[i], NULL);
    }

    /* Print total consumed products */
    printf("the consumer has totally consumed %d producteds\n", consumed);
    return 0;
}

网站公告

今日签到

点亮在社区的每一天
去签到