目录
问题描述
在编写多线程的时候,有一种情况是十分常见的。那就是,有些公共数据修改的机会比较少。相比较改写,它们读的机会反而高的多。通常而言,在读的过程中,往往伴随着查找的操作,中间耗时很长。给这种代码段加锁,会极大地降低我们程序的效率。这类问题就归结为读者写者问题。
那么有没有一种方法,可以专门处理这种多读少写的情况呢?
有的,有的,那就是读写锁。
读写锁
主要思想
那么根据上面的问题,我们就可以设想,允许多个进程同时读一个共享对象,但不允许一个写者进程和其他写者进程或读者进程同时访问共享对象。即:保证一个写者进程必须与其他进程互斥的访问共享对象的同步问题;读者-写者问题常用来测试新同步原语。
对上面的设想进行抽象:读者与写者是互斥的,写者与写者是互斥的,读者与读者不互斥;有两个角色,一个是写者,一个是读者。一个交易场所:共享对象,也可以叫做临界资源。不难发现这个抽象的模型就类似于生产消费者模型。
所以我们就将此用表格来表示就为:
归结为:写独占,读共享,写锁优先级高。
那么对于写者来说,它和其他任何进程都是互斥的,因此对每一个写者进程都给一个互斥信号量的P、V操作即可;而读者进程的问题就较为复杂,它与写者进程是互斥的,而又与其他的读者进程是同步的,因此不能简单的利用P、V操作解决。
我们先伪代码实现,然后再学习库。
下面给出三种经典方案来解决读者和写者之间、读者和读者之间、写者和写者之间的同步与互斥问题:
代码伪实现
读者优先算法
它确保当读者和写者同时请求访问共享资源时,读者会获得优先访问权。
为了实现读者与写者之间的互斥,我们要设置一个互斥信号量write_sem初始值为1。再设置一个整型变量conut表示正在读的进程数目。
- 当count==1时,表示当前第一个读者已经开始读,此时就需要将写者阻塞,禁止访问共享资源。同理,仅当读进程在执行了count减一操作后其值为0时,才需要执行将写者唤醒操作,以便让写者进行操作。
- 由于count是一个可被多个读者进程访问的临界资源,因此为其设置一个互斥信号量rmutex;
伪代码实现:
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
using namespace std;
#define MAX_READERS 5
int count = 0; // 当前活跃的读者数量
pthread_mutex_t mutex; // 保护 count
sem_t write_sem; // 写者信号量(初始为1)
void *reader(void *arg)
{
int id = *(int*)arg;
while(1)
{
pthread_mutex_lock(&mutex);
count++;
if(count == 1)
{
sem_wait(&write_sem); // 第一个读者将写者阻塞
}
pthread_mutex_unlock(&mutex);
// 读取数据
printf("Reader %d reading\n", id);
pthread_mutex_lock(&mutex);
count--;
if(count == 0)
{
sem_post(&write_sem);
}
printf("Reader %d read end\n", id);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
void *writer(void *arg)
{
int id = *(int*)arg;
while (1)
{
sem_wait(&write_sem);
printf("Writer %d writing\n", id);
printf("Writer %d write end\n", id);
sem_post(&write_sem);
sleep(1);
}
}
int main()
{
pthread_t readers[MAX_READERS], writers[2]; // 5个读者,2个写者
int ids[MAX_READERS];
pthread_mutex_init(&mutex, NULL);
sem_init(&write_sem, 0, 1); // 初始写信号量为1(可进入)
// 创建读者线程
for (int i = 0; i < MAX_READERS; i++) {
ids[i] = i + 1;
pthread_create(&readers[i], NULL, reader, &ids[i]);
}
// 创建写者线程
for (int i = 0; i < 2; i++) {
pthread_create(&writers[i], NULL, writer, &ids[i]);
}
// 等待线程结束
for (int i = 0; i < MAX_READERS; i++) {
pthread_join(readers[i], NULL);
}
for (int i = 0; i < 2; i++) {
pthread_join(writers[i], NULL);
}
pthread_mutex_destroy(&mutex);
sem_destroy(&write_sem);
return 0;
}
运行会有一下效果:
由于测试文本数目较小,当训练集数目较大时,则写者亦有可能出现如上情况,即:等待读者全部结束之后才逐步执行写者。我们称这样的算法为读者优先算法,也就是说,当存在读者时,写者操作将被延迟,并且只要有一个读者活跃,随后而来的读者都将被允许访问文件。这样的方式下,会导致写者可能长时间等待,且存在写者“饿死”的情况。
写者优先算法
写者优先算法是读者-写者问题的另一种解决方案,它确保当有写者等待时,新到达的读者必须等待,直到所有等待的写者完成操作。这样可以防止写者饥饿。
对上面的代码修改的地方不是很多,增加一个信号量并且在上面的程序中 writer()和reader()函数中各增加一对PV操作,就可以得到写进程优先的解决程序。
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
using namespace std;
#define MAX_READERS 5
#define MAX_WRITERS 3
int read_count = 0; // 当前活跃的读者数量
int writer_count = 0;
pthread_mutex_t mutex; // 保护 count
pthread_mutex_t write_mutex;
sem_t write_sem; // 写者信号量(初始为1)
sem_t read_sem; // 读者信号量(初始为1)
// 写者优先
// 当有写者等待时,新的读者不会插队
void *reader(void *arg)
{
int id = *(int*)arg;
while(1)
{
// 防止写者等待时新的读者插队
sem_wait(&read_sem);
pthread_mutex_lock(&mutex);
read_count++;
if(read_count == 1)
{
sem_wait(&write_sem); // 第一个读者将写者阻塞
}
pthread_mutex_unlock(&mutex);
sem_post(&read_sem); // 允许别的读者操作
// 读取数据
printf("Reader %d reading\n", id);
pthread_mutex_lock(&mutex);
read_count--;
if(read_count == 0)
{
sem_post(&write_sem);
}
printf("Reader %d read end\n", id);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
void *writer(void *arg)
{
int id = *(int*)arg;
while (1)
{
pthread_mutex_lock(&write_mutex);
writer_count++;
if (writer_count == 1)
{
sem_wait(&read_sem);
}
pthread_mutex_unlock(&write_mutex);
sem_wait(&write_sem);
printf("Writer %d writing\n", id);
printf("Writer %d write end\n", id);
sem_post(&write_sem);
pthread_mutex_lock(&write_mutex);
writer_count--;
if(writer_count == 0)
{
sem_post(&read_sem);
}
pthread_mutex_unlock(&write_mutex);
sleep(1);
}
}
int main()
{
pthread_t readers[MAX_READERS], writers[2]; // 5个读者,2个写者
int reader_ids[MAX_READERS], writer_ids[MAX_WRITERS];
pthread_mutex_init(&mutex, NULL);
pthread_mutex_init(&write_mutex, NULL);
sem_init(&write_sem, 0, 1); // 初始写信号量为1(可进入)
sem_init(&read_sem, 0, 1); // 初始写信号量为1(可进入)
// 创建读者线程
for (int i = 0; i < MAX_READERS; i++) {
reader_ids[i] = i + 1;
pthread_create(&readers[i], NULL, reader, &reader_ids[i]);
}
// 创建写者线程
for (int i = 0; i < MAX_WRITERS; i++) {
writer_ids[i] = i + 1;
pthread_create(&writers[i], NULL, writer, &writer_ids[i]);
}
// 等待线程结束
for (int i = 0; i < MAX_READERS; i++) {
pthread_join(readers[i], NULL);
}
for (int i = 0; i < 2; i++) {
pthread_join(writers[i], NULL);
}
pthread_mutex_destroy(&mutex);
pthread_mutex_destroy(&write_mutex);
sem_destroy(&write_sem);
sem_destroy(&read_sem);
return 0;
}
运行效果如下:
因为当前的测试用例比较小,所以跟上一个算法的运行效果一样,这又是很正常的事情。
读者写者公平算法
公平的读者-写者算法旨在平衡读者和写者的访问,避免任何一方饥饿。这种实现通常使用队列或额外的同步机制来确保请求按照到达顺序被处理。
该算法特点就是无饥饿。请求按照到达顺序被处理。
实现的主要思想是:
使用一个队列信号量:所有线程(读者和写者)必须先获取这个信号量
记录等待的写者数量:当有写者等待时,新读者不能插队
保持读者并发性:允许多个读者同时读,但必须尊重写者的等待
代码实现:
#include <iostream>
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h>
using namespace std;
#define MAX_READERS 5
#define MAX_WRITERS 3
int read_count = 0;
int writer_wait_count = 0;
pthread_mutex_t mutex; // 保护 count
sem_t queue_sem; // 公平队列信号量
sem_t write_sem; // 写者信号量(初始为1)
sem_t read_sem; // 读者信号量(初始为1)
// 写者优先
// 当有写者等待时,新的读者不会插队
void *reader(void *arg)
{
int id = *(int*)arg;
while(1)
{
// 进入公平队列
sem_wait(&queue_sem);
// 检查是否有写者等待
sem_wait(&read_sem);
sem_post(&queue_sem);
pthread_mutex_lock(&mutex);
read_count++;
if(read_count == 1)
{
sem_wait(&write_sem); // 第一个读者将写者阻塞
}
pthread_mutex_unlock(&mutex);
sem_post(&read_sem); // 允许别的读者操作
// 读取数据
printf("Reader %d reading\n", id);
pthread_mutex_lock(&mutex);
read_count--;
if(read_count == 0)
{
sem_post(&write_sem);
}
printf("Reader %d read end\n", id);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
void *writer(void *arg)
{
int id = *(int*)arg;
while (1)
{
// 进入公平队列
sem_wait(&queue_sem);
// 标记有写者等待
pthread_mutex_lock(&mutex);
writer_wait_count++;
pthread_mutex_unlock(&mutex);
// 离开队列,允许下一个线程
sem_post(&queue_sem);
sem_wait(&write_sem); // 获取写锁
printf("Writer %d writing\n", id);
printf("Writer %d write end\n", id);
sem_post(&write_sem);
pthread_mutex_lock(&mutex);
writer_wait_count--;
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
int main()
{
pthread_t readers[MAX_READERS], writers[2]; // 5个读者,2个写者
int reader_ids[MAX_READERS], writer_ids[MAX_WRITERS];
pthread_mutex_init(&mutex, NULL);
sem_init(&write_sem, 0, 1); // 初始写信号量为1(可进入)
sem_init(&read_sem, 0, 1); // 初始写信号量为1(可进入)
sem_init(&queue_sem, 0, 1);
// 创建读者线程
for (int i = 0; i < MAX_READERS; i++) {
reader_ids[i] = i + 1;
pthread_create(&readers[i], NULL, reader, &reader_ids[i]);
}
// 创建写者线程
for (int i = 0; i < MAX_WRITERS; i++) {
writer_ids[i] = i + 1;
pthread_create(&writers[i], NULL, writer, &writer_ids[i]);
}
// 等待线程结束
for (int i = 0; i < MAX_READERS; i++) {
pthread_join(readers[i], NULL);
}
for (int i = 0; i < 2; i++) {
pthread_join(writers[i], NULL);
}
pthread_mutex_destroy(&mutex);
sem_destroy(&write_sem);
sem_destroy(&read_sem);
return 0;
}
运行效果如下:
库函数的学习
读写锁接口
设置读写优先
int pthread_rwlockattr_setkind_np(pthread_rwlockattr_t *attr, int pref);
pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和
PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
参数说明:
- attr:类型:
pthread_rwlockattr_t *,
其作用为指向读写锁属性对象的指针,用于配置锁的行为。 pref 共有 3 种选择
PTHREAD_RWLOCK_PREFER_READER_NP (默认设置) 读者优先,可能会导致写者饥饿情况
PTHREAD_RWLOCK_PREFER_WRITER_NP 写者优先,目前有 BUG,导致表现行为和PTHREAD_RWLOCK_PREFER_READER_NP 一致
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP 写者优先,但写者不能递归加锁
返回值:
成功:返回
0
。失败:返回错误码(非零值)。
初始化
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t
*restrict attr);
参数说明:
rwlock
:类型:pthread_rwlock_t *,
其作用为指向要初始化的读写锁对象的指针。必须指向有效的内存地址,通常为全局变量或堆分配的对象。attr
:指向读写锁属性对象的指针,用于配置锁的额外行为(如进程共享、优先级策略等)如果为NULL
,则使用默认属性初始化锁(读者优先、线程间可见)。如果需自定义行为,需先通过pthread_rwlockattr_init
创建属性对象并配置。
返回值:
成功:返回
0
。失败:返回错误码(非零值)。
销毁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
参数说明:
rwlock
:类型:pthread_rwlock_t *,
其作用为指向要初始化的读写锁对象的指针。必须指向已通过pthread_rwlock_init
初始化或PTHREAD_RWLOCK_INITIALIZER
静态初始化的有效锁对象。
返回值:
成功:返回
0
。失败:返回错误码(非零值)。
加锁和解锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
前两个分别为获取读锁与写锁。最后一个为释放锁。
参数说明:
rwlock
:类型:pthread_rwlock_t *,
其作用为指向要初始化的读写锁对象的指针。必须指向已通过pthread_rwlock_init
初始化或PTHREAD_RWLOCK_INITIALIZER
静态初始化的有效锁对象。
返回值:
成功:返回
0
。失败:返回错误码(非零值)。
读写锁使用案例
代码粘自别处,非本人锁敲。
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <unistd.h>
#include <pthread.h>
volatile int ticket = 1000;
pthread_rwlock_t rwlock;
void *reader(void *arg)
{
char *id = (char *)arg;
while (1)
{
pthread_rwlock_rdlock(&rwlock);
if (ticket <= 0)
{
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
void *writer(void *arg)
{
char *id = (char *)arg;
while (1)
{
pthread_rwlock_wrlock(&rwlock);
if (ticket <= 0)
{
pthread_rwlock_unlock(&rwlock);
break;
}
printf("%s: %d\n", id, --ticket);
pthread_rwlock_unlock(&rwlock);
usleep(1);
}
return nullptr;
}
struct ThreadAttr
{
pthread_t tid;
std::string id;
};
std::string create_reader_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread reader ", std::ios_base::ate);
oss << i;
return oss.str();
}
std::string create_writer_id(std::size_t i)
{
// 利用 ostringstream 进行 string 拼接
std::ostringstream oss("thread writer ", std::ios_base::ate);
oss << i;
return oss.str();
}
void init_readers(std::vector<ThreadAttr> &vec)
{
for (std::size_t i = 0; i < vec.size(); ++i)
{
vec[i].id = create_reader_id(i);
pthread_create(&vec[i].tid, nullptr, reader, (void *)vec[i].id.c_str());
}
}
void init_writers(std::vector<ThreadAttr> &vec)
{
for (std::size_t i = 0; i < vec.size(); ++i)
{
vec[i].id = create_writer_id(i);
pthread_create(&vec[i].tid, nullptr, writer, (void *)vec[i].id.c_str());
}
}
void join_threads(std::vector<ThreadAttr> const &vec)
{
// 我们按创建的 逆序 来进行线程的回收
for (std::vector<ThreadAttr>::const_reverse_iterator it = vec.rbegin(); it !=
vec.rend();
++it)
{
pthread_t const &tid = it->tid;
pthread_join(tid, nullptr);
}
}
void init_rwlock()
{
#if 0 // 写优先
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&rwlock, &attr);
pthread_rwlockattr_destroy(&attr);
#else // 读优先,会造成写饥饿
pthread_rwlock_init(&rwlock, nullptr);
#endif
}
int main()
{
// 测试效果不明显的情况下,可以加大 reader_nr
// 但也不能太大,超过一定阈值后系统就调度不了主线程了
const std::size_t reader_nr = 1000;
const std::size_t writer_nr = 2;
std::vector<ThreadAttr> readers(reader_nr);
std::vector<ThreadAttr> writers(writer_nr);
init_rwlock();
init_readers(readers);
init_writers(writers);
join_threads(writers);
join_threads(readers);
pthread_rwlock_destroy(&rwlock);
}
如图仅可看到写饥饿