Linux中的多线程

发布于:2024-10-10 ⋅ 阅读:(16) ⋅ 点赞:(0)

Linux线程概念

在一个程序里的一个执行路线就叫做线程(thread)。更准确的定义是:线程是“一个进程内部的控制序 列”

进程是系统分配资源的基本实体

线程是CPU调度的基本单位

POSIX线程库

创建线程

功能:创建一个新的线程
原型
 int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *
(*start_routine)(void*), void *arg);
参数
 thread:返回线程ID
 attr:设置线程的属性,attr为NULL表示使用默认属性
 start_routine:是个函数地址,线程启动后要执行的函数
 arg:传给线程启动函数的参数
返回值:成功返回0;失败返回错误码

ps -aL

查看所有执行流

两个进程pid相同,但lwp不同,lwp->(light weight process)轻量级进程

线程的私有资源

1.PCB属性私有

2.有一定的私有上下文结构

3.独立的栈结构

线程对应的函数运行完成后,线程也会结束。

线程等待

线程等待 为什么需要线程等待?

已经退出的线程,其空间没有被释放,仍然在进程的地址空间内。

创建新的线程不会复用刚才退出线程的地址空间。

功能:等待线程结束
原型
 int pthread_join(pthread_t thread, void **value_ptr);
参数
 thread:线程ID
 value_ptr:它指向一个指针,后者指向线程的返回值
返回值:成功返回0;失败返回错误码

线程终止

功能:线程终止
原型
 void pthread_exit(void *value_ptr);
参数
 value_ptr:value_ptr不要指向一个局部变量。
返回值:无返回值,跟进程一样,线程结束的时候无法返回到它的调用者(自身)
class ThreadData
{
public:
    pthread_t tid;
    char namebuffer[64];
};
void* start_routine(void* args)//可重入函数
void* start_routine(void* args)
{//可重入函数
    ThreadData* td=static_cast<ThreadData*>(args);
    int cnt=10;
    while (cnt)
    {
        //cout<<"new thread create success, name: "<<td->namebuffer<<endl;
        cout<<"cnt:  "<<cnt<<"   &cnt:  "<<&cnt<<endl;
        cnt--;
        sleep(1);
    }
    //delete td;
    pthread_exit(nullptr);
    //return nullptr;
}
vector<ThreadData*> threads;
#define NUM 10
    for(int i=0;i<NUM;i++)
    {
        ThreadData* td= new ThreadData();
        snprintf(td->namebuffer,sizeof(td->namebuffer),"%s : %d","thread",i+1);
        pthread_create(&td->tid,nullptr,start_routine,td);
        threads.push_back(td);
    }
    for(auto& iter: threads)
    {
        cout<<"create thread: "<<iter->namebuffer<<" : "<<iter->tid<<"  success"<<endl;

    }
    for(auto& iter:threads)
    {
        int n=pthread_join(iter->tid,nullptr);
        assert(n==0);
        cout<<"join : "<<iter->namebuffer<<" success"<<endl;
        delete iter;
    }
    cout<<"main thread quit"<<endl;

void* start_routine(void* args)
{//可重入函数
    ThreadData* td=static_cast<ThreadData*>(args);
    int cnt=10;
    while (cnt)
    {
        //cout<<"new thread create success, name: "<<td->namebuffer<<endl;
        cout<<"cnt:  "<<cnt<<"   &cnt:  "<<&cnt<<endl;
        cnt--;
        sleep(1);
    }
    //delete td;
    pthread_exit(nullptr);
    //return nullptr;
    return (void*)td->number;
}
for(auto& iter:threads)
    {
        void* ret=nullptr;
        int n=pthread_join(iter->tid,&ret);//void** retp; *retp=(void*)td->number
        assert(n==0);
        cout<<"join : "<<iter->namebuffer<<" success,number: "<<(long long)ret<<endl;
        delete iter;
    }
    cout<<"main thread quit"<<endl;

线程异常,收到信号,整个进程都会退出。

线程被取消其退出码为-1(PTHREAD_CANCELED)

pthread_t tid;
    pthread_create(&tid,nullptr,start_routine,(void*)"thread1");
    string main_id = changeId(pthread_self());
    pthread_detach(tid);

在主线程中将新线程分离

int g_val=100;
string changeId(const pthread_t& thread_id)
{
    char tid[120];
    snprintf(tid,sizeof tid,"  0x%lx",thread_id);
    return tid;
}
void* start_routine(void* args)
{
    string threadname=static_cast<const char*>(args);
    int cnt=5;
    while(true)
    {
        cout<<threadname<<"running ..."<<changeId(pthread_self())<<"   g_val: "<<g_val<<"    &g_val: "<<&g_val<<endl;
        g_val++;
        sleep(1);
    }
}
pthread_t tid;
    pthread_create(&tid,nullptr,start_routine,(void*)"thread1");
    string main_id = changeId(pthread_self());
    pthread_detach(tid);

    cout<<"main thread running, ...new thread id: "<<changeId(tid)<<"main thread id: "<<main_id<<endl;
    while(true)
    {
        cout<<"main thread running... new thread id: "<<changeId(tid)
            <<"main thread id: "<<main_id<<"  g_val: "<<g_val<<"  &g_val: "
                <<&g_val<<endl;
        sleep(1);
    }
__thread int g_val=100;

添加__thread ,可以将一个内置类型设为线程局部储存,给每个线程各一份

封装thread

#pragma once

#include<iostream>
#include<string>
#include<cstring>
#include<functional>
#include<pthread.h>

using namespace std;

class Thread;
class Context
{
public:
    Thread* this_;
    void* args_;
public:
    Context()
        :this_(nullptr),args_(nullptr)
    {}
    ~Context()
    {}
};

class Thread
{
public:
    typedef function<void*(void*)> func_t;
    const int num =1024;
public:
    Thread(func_t func,void* args,int number)
        :func_(func),args_(args)
    {
        char buffer[num];
        snprintf(buffer,sizeof buffer,"thread-%d",number);
        name_=buffer;
        Context* ctx=new Context();
        ctx->this_=this;
        ctx->args_=args_;
        int n=pthread_create(&tid_,nullptr,start_routine,ctx);
    }
    static void* start_routine(void* args)
    {
        Context* ctx=static_cast<Context*>(args);
        void* ret=ctx->this_->run(ctx->args_);
        delete ctx;
        return ret;
    }

    void join()
    {
        int n=pthread_join(tid_,nullptr);
    }
    void* run(void* args)
    {
        return func_(args);
    }
    ~Thread()
    {}
private:
    string name_;
    pthread_t tid_;
    func_t func_;
    void* args_;
};
void* thread_run(void* args)
{
    string work_type=static_cast<const char*>(args);
    while(true)
    {
        cout<<"我是一个新线程,我正在作: "<<work_type<<endl;
        sleep(1);
    }
}

    unique_ptr<Thread> thread1(new Thread(thread_run,(void*)"hellothread",1));
    unique_ptr<Thread> thread2(new Thread(thread_run,(void*)"hellothread",2));
    unique_ptr<Thread> thread3(new Thread(thread_run,(void*)"hellothread",3));
    thread1->join();
    thread2->join();
    thread3->join();

互斥量的接口

初始化互斥量

初始化互斥量有两种方法:

方法1,静态分配:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER 

方法2,动态分配:

int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict 
attr); 
 参数: 
 mutex:要初始化的互斥量 
 attr:NULL

销毁互斥量

销毁互斥量需要注意:

使用 PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁

不要销毁一个已经加锁的互斥量

已经销毁的互斥量,要确保后面不会有线程再尝试加锁

int pthread_mutex_destroy(pthread_mutex_t *mutex);

互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex); 
int pthread_mutex_unlock(pthread_mutex_t *mutex); 
返回值:成功返回0,失败返回错误号 

单纯的 i++ 或者 ++i 都不是原子的,有可能会有数据一致性问题

大多数体系结构都提供了swap或exchange指令,该指令的作用是把寄存器和内存单 元的数据相交换,由于只有一条指令,保证了原子性,即使是多处理器平台,访问内存的 总线周期也有先后,一 个处理器上的交换指令执行时另一个处理器的交换指令只能等待总线周期。

共识:

1.CPU内寄存器只有一套被所有的执行流共享

2.CPU内寄存器的内容是每个执行流私有的->运行时上下文

封装mutex

#pragma once

#include<iostream>
#include<pthread.h>

class Mutex
{
public:
    Mutex(pthread_mutex_t* lock_p=nullptr)
        :lock_p_(lock_p)
    {}
    void lock()
    {
        if(lock_p_) pthread_mutex_lock(lock_p_);
    }
    void unlock()
    {
        if(lock_p_) pthread_mutex_unlock(lock_p_);
    }
    ~Mutex()
    {} 
private:
    pthread_mutex_t* lock_p_;
};

class LockGuard
{
public:
    LockGuard(pthread_mutex_t* mutex)
        :mutex_(mutex)
    {
        mutex_.lock();
    }
    ~LockGuard()
    {
        mutex_.unlock();
    }
private:
    Mutex mutex_;
};

死锁

死锁四个必要条件

互斥条件:一个资源每次只能被一个执行流使用

请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放

不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺

循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系

Linux线程同步

条件变量

当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。

例如一个线程访问队列时,发现队列为空,它只能等待,只到其它线程将一个节点添加到队列中。这种情 况就需要用到条件变量。

条件变量函数 初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict 
attr); 
参数: 
 cond:要初始化的条件变量 
 attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满足

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex); 
 参数: 
 cond:要在这个条件变量上等待 
 mutex:互斥量,后面详细解释

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond); 
int pthread_cond_signal(pthread_cond_t *cond); 
#include<iostream>
#include<string>
#include<unistd.h>
#include<pthread.h>

using namespace std;
int tickets=1000;
pthread_mutex_t mutex= PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;

void* start_routine(void* args)
{
    char* name= static_cast<char*>(args);
    while (true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond,&mutex);
        cout<<name<<" -> "<<tickets<<endl;
        tickets--;
        pthread_mutex_unlock(&mutex);
    }
    
}
int main()
{
    //通过条件变量控制线程执行
    pthread_t t[5];
    for(int i=0;i<5;i++)
    {
        char* name= new char[64];
        snprintf(name,64,"thread: %d",i+1);
        pthread_create(t+i,nullptr,start_routine,name);
    }
    
    while(true)
    {
        sleep(1);
        pthread_cond_signal(&cond);//唤醒条件变量下的线程
    }
    for(int i=0;i<5;i++)
    {
        pthread_join(t[i],nullptr);
    }
    
    return 0;
}

 

生产者消费者模型

总结(321原则):

3种关系:

生产者和生产者(互斥)

消费者和消费者(互斥)

生产者和消费者(互斥->保证共享资源的安全性,同步)

2种角色:

生产者线程,消费者线程

1个交易场所:

一段特点结构的缓存区