WIN32核心编程 - 线程操作(三) 线程优先级 - 生产者与消费者模式

发布于:2024-07-07 ⋅ 阅读:(42) ⋅ 点赞:(0)

目录

线程优先级和调度

生产者与消费者模式

关键概念

实现细节

案例描述(一对一)

案例描述(多对一)


线程优先级和调度

  • 在Windows操作系统中,线程优先级与进程优先级共同决定了线程的最终优先级。
  • 进程优先级类决定了进程内所有线程的基本优先级,并且每个线程可以在此基础上有自己的优先级设置。

  • Windows提供了几个不同的进程优先级类:

    • IDLE_PRIORITY_CLASS

    • BELOW_NORMAL_PRIORITY_CLASS

    • NORMAL_PRIORITY_CLASS

    • ABOVE_NORMAL_PRIORITY_CLASS

    • HIGH_PRIORITY_CLASS

    • REALTIME_PRIORITY_CLASS

  • 在每个进程优先级类内,线程可以有以下优先级:

    • THREAD_PRIORITY_LOWEST

    • THREAD_PRIORITY_BELOW_NORMAL

    • THREAD_PRIORITY_NORMAL

    • THREAD_PRIORITY_ABOVE_NORMAL

    • THREAD_PRIORITY_HIGHEST

    • THREAD_PRIORITY_TIME_CRITICAL

  • 线程调度与时间碎片

    • Windows使用抢占式调度来分配CPU时间给线程。每个线程都会被分配一个时间片,它是线程可以在被挂起之前连续运行的时间量。

    • Windows中的实时优先级类别允许线程以最小的延迟执行。这些线程几乎总是优先于其他线程运行,除非有其他更高优先级的实时线程。这对于需要精确计时或快速响应的任务至关重要。

#include <iostream>
#include <Windows.h>

DWORD WINAPI WorkThread(LPVOID lp)
{	
	std::cout << GetThreadPriority(GetCurrentThread());
	return 0;
}

int main()
{
	// 设置进程优先级
	SetPriorityClass(GetCurrentProcess(), HIGH_PRIORITY_CLASS);

	// 获取进程优先级
	GetPriorityClass(GetCurrentProcess());

	// 创建执行线程
	HANDLE hThread = CreateThread(NULL, 0, WorkThread, NULL, 0, NULL);
	SetThreadPriority(hThread, THREAD_PRIORITY_HIGHEST);
	WaitForSingleObject(hThread, INFINITE);

	return 0;
}

生产者与消费者模式

关键概念

  1. 生产者(Producer) - 负责生成数据并将其放入缓冲区的线程或进程。通常会在没有足够空间放入新数据时等待。

  2. 消费者(Consumer) - 负责从缓冲区取出数据并处理它的线程或进程。如果缓冲区为空,消费者将等待直到有数据可用。

  3. 缓冲区(Buffer) - 一个有限的存储空间,用于生产者存放数据,消费者从中取出数据。它可以是一个队列、数组或任何其他形式的集合。

  4. 同步机制 - 生产者和消费者需要通过某种方式来协调它们的工作,以确保它们不会在同一时间内对缓冲区进行写入或读取操作。常见的同步机制包括互斥锁(mutexes)、信号量(semaphores)和条件变量(condition variables)。

实现细节

  1. 互斥锁 - 确保在任意时刻只有一个线程可以访问缓冲区。这防止了并发访问导致的数据竞争条件。

  2. 信号量 - 可以用来表示缓冲区中可用资源的数量。通常有两个信号量:一个表示空位的数量(对生产者而言),另一个表示数据项的数量(对消费者而言)。

案例描述(一对一)

  • 假设有一个工厂(生产者)和一个零售商店(消费者)。工厂生产产品并将其放入仓库(缓冲区),而零售商店从仓库中取出产品并卖给客户。仓库的空间是有限的,所以当它满了时,工厂必须停止生产,等待零售商店取走一些产品。同样,如果仓库是空的,零售商店必须等待工厂生产新的产品。
  1. 初始化:设置一个有限大小的缓冲区以及必要的同步机制。

  2. 生产者线程执行以下操作:

    • 确认缓冲区中是否有空间可用。

    • 如果缓冲区已满,则等待直到有空位。

    • 生产一个数据项并将其放入缓冲区中。

    • 通知消费者缓冲区中有新的数据项可用。

  3. 消费者线程执行以下操作:

    • 确认缓冲区内是否有数据项。

    • 如果缓冲区为空,则等待直到有数据项。

    • 从缓冲区中取出一个数据项进行处理。

    • 通知生产者已从缓冲区中取出数据项,使其可以添加新的数据项。

  4. 同步机制:

    • 使用互斥锁(mutex)保护对缓冲区的访问,以保证任一时刻只有一个线程可以操作缓冲区。

    • 使用条件变量(condition variable)使线程在特定条件不满足时等待,并在条件满足时接到通知以继续执行。

  5. 终止过程:

    • 生产者在完成既定数量的生产后终止。

    • 消费者在消费完所有生产者生产的产品后终止。

#include <iostream>
#include <Windows.h>

// 缓冲容量
#define BUFFER_SIZE 5
int Buffer[BUFFER_SIZE] = { 0 };

// 交接信息
int In = 0;
int Out = 0;
int Sum = 0;
int Count = 0;

// 同步对象
HANDLE hMutex = NULL;
HANDLE hProduce = NULL;
HANDLE hConsume = NULL;

// 生产者
DWORD WINAPI Produce(LPVOID lp)
{
	for (size_t i = 0; i < Sum; i++)
	{
		// 等待是否可以生成的信号
		WaitForSingleObject(hProduce, INFINITE);

		// 获取互斥体所有权
		WaitForSingleObject(hMutex, INFINITE);

		// 生成商品放入仓库
		Buffer[In] = i;
		In = (In + 1) % BUFFER_SIZE;
		++Count;
		std::cout << "Produce -> " << i << std::endl;

		// 释放互斥体所有权
		ReleaseMutex(hMutex);

		if (Count == 1)
		{
			SetEvent(hConsume);
		}
		else if (Count < BUFFER_SIZE)
		{
			SetEvent(hProduce);
		}
		else
		{
			ResetEvent(hProduce);
		}

		Sleep(rand() % 100);
	}

	return 0;
}

// 消费者
DWORD WINAPI Consume(LPVOID lp)
{
	int nConut = 1;

	while (1)
	{
		// 等待是否可以消费的信号
		WaitForSingleObject(hConsume, INFINITE);

		// 获取互斥体所有权
		WaitForSingleObject(hMutex, INFINITE);

		// 消费商品取出仓库
		int Data = Buffer[Out];
		Out = (Out + 1) % BUFFER_SIZE;
		--Count;
		std::cout << "Consume -> " << Data << std::endl;

		// 释放互斥体所有权
		ReleaseMutex(hMutex);

		if (Count == BUFFER_SIZE - 1)
		{
			SetEvent(hProduce);
		}

		if (Count > 0)
		{
			SetEvent(hConsume);
		}
		else
		{
			ResetEvent(hConsume);
		}

		Sleep(rand() % 100);

		// 条件满足 结束线程
		if (nConut == Sum) break;
	}

	return 0;
}

int main()
{
	Sum = 20;

	// 初始同步对象
	hMutex = CreateMutex(NULL, FALSE, NULL);
	hProduce = CreateEvent(NULL, TRUE, TRUE, NULL);
	hConsume = CreateEvent(NULL, TRUE, FALSE, NULL);

	// 生产消费线程
	HANDLE thProduce = CreateThread(NULL, 0, Produce, NULL, 0, NULL);
	HANDLE thConsume = CreateThread(NULL, 0, Consume, NULL, 0, NULL);

	// 等待线程完成
	WaitForSingleObject(thProduce, INFINITE);
	WaitForSingleObject(thConsume, INFINITE);

	// 关闭对象句柄
	CloseHandle(hMutex);
	CloseHandle(hProduce);
	CloseHandle(hConsume);
	CloseHandle(thProduce);
	CloseHandle(thConsume);

	return 0;
}

案例描述(多对一)

  • 假设有一个工厂,拥有多条生产线(生产者线程),每条生产线负责生产不同类型的产品。这些产品随后会被送往一个共享的仓库(缓冲区),以便单个零售商(消费者线程)从中取走产品并销售。由于仓库的容量有限,当仓库满时,所有生产线必须暂停生产,直到仓库中有足够的空间再继续生产。同样地,如果仓库为空,零售商也必须等待,直到仓库中有产品可供取出。
  • 关键步骤

  • 初始化:设置一个有限大小的缓冲区以及必要的互斥锁和条件变量(或事件)。

  • 生产者线程执行以下操作:

    1. 确认缓冲区中是否有空间可用。

    2. 如果缓冲区已满,则等待直到有空位(通过条件变量或事件)。

    3. 获取互斥锁,生产一个数据项并将其放入缓冲区中,然后释放互斥锁。

    4. 通知消费者缓冲区中有新的数据项可用。

  • 消费者线程执行以下操作:

    1. 确认缓冲区内是否有数据项。

    2. 如果缓冲区为空,则等待直到有数据项(通过条件变量或事件)。

    3. 获取互斥锁,从缓冲区中取出一个数据项进行处理,然后释放互斥锁。

    4. 通知生产者已从缓冲区中取出数据项,使其可以添加新的数据项。

  • 实现注意事项

    • 死锁避免:确保获取和释放锁的顺序一致,避免出现死锁情况。

    • 条件竞争处理:使用互斥锁保护所有访问共享资源(例如,缓冲区和计数器)的操作,确保每次只有一个线程可以执行这些操作。

    • 适当的信号通知:在修改了可能影响其他线程行为的共享状态后(例如,添加或移除缓冲区中的数据项),及时通过条件变量或事件发送适当的信号,以唤醒等待的线程。

#include <iostream>
#include <queue>
#include <Windows.h>

// 共享数据
HANDLE hThreads[3] = { NULL };
std::queue<int> Queue;
CONST int Max = 10;

// 线程竞态
CRITICAL_SECTION QueueLock = { 0 };
HANDLE hQueueNotEmpty = NULL;
HANDLE hQueueNotFull = NULL;
BOOL bProduceFinished = FALSE;

// 生产者线程
DWORD WINAPI Produce(LPVOID lp)
{
	for (size_t i = 0; i < 50; i++)
	{
		WaitForSingleObject(hQueueNotFull, INFINITE);
		EnterCriticalSection(&QueueLock);
		
		if (Queue.size() >= Max)
		{
			ResetEvent(hQueueNotFull);
		}
		else
		{
			Queue.push(i);
			std::cout << "ID -> " << GetCurrentThreadId() << " Produce -> " << i << std::endl;
			SetEvent(hQueueNotEmpty);
		}

		LeaveCriticalSection(&QueueLock);

		Sleep(rand() % 100);
	}

	EnterCriticalSection(&QueueLock);
	bProduceFinished = TRUE;
	LeaveCriticalSection(&QueueLock);
	SetEvent(hQueueNotEmpty);

	return 0;
}


// 消费者线程
DWORD WINAPI Consume(LPVOID lp)
{
	while (true)
	{
		WaitForSingleObject(hQueueNotEmpty, INFINITE);
		EnterCriticalSection(&QueueLock);

		while (!Queue.empty())
		{
			int Data = Queue.front();
			Queue.pop();
			std::cout << "ID -> " << GetCurrentThreadId() << " Consume -> " << Data << std::endl;
		}

		if (bProduceFinished && Queue.empty())
		{
			LeaveCriticalSection(&QueueLock);
			break;
		}

		SetEvent(hQueueNotFull);

		LeaveCriticalSection(&QueueLock);

		Sleep(rand() % 100);
	}

	return 0;
}


int main()
{
	// 初始同步对象
	InitializeCriticalSection(&QueueLock);
	hQueueNotFull = CreateEvent(NULL, TRUE, TRUE, NULL);
	hQueueNotEmpty = CreateEvent(NULL, FALSE, FALSE, NULL);

	// 启动执行线程
	hThreads[0] = CreateThread(NULL, 0, Produce, NULL, 0, NULL);
	hThreads[1] = CreateThread(NULL, 0, Produce, NULL, 0, NULL);
	hThreads[2] = CreateThread(NULL, 0, Consume, NULL, 0, NULL);

	// 等待线程结束
	WaitForMultipleObjects(3, hThreads, TRUE, INFINITE);

	// 释放对象资源
	DeleteCriticalSection(&QueueLock);
	for (auto i : hThreads) { CloseHandle(i); }
	CloseHandle(hQueueNotFull);
	CloseHandle(hQueueNotEmpty);

	return 0;
}


网站公告

今日签到

点亮在社区的每一天
去签到