这段代码实现了一个基于 Windows I/O 完成端口(IOCP)的多线程任务队列系统。它通过 IOCP 将任务分发到线程池中执行,并通过一个线程安全的队列(std::list<std::string>
)来管理任务数据。以下是对代码的详细讲解:
1. 宏定义和枚举
#define IOCP_LIST_EMPTR 0
#define IOCP_LIST_PUSH 1
#define IOCP_LIST_POP 2
enum {
IocpListEmpty,
IocpListPush,
IocpListPop
};
宏定义和枚举用于定义操作类型:
IOCP_LIST_EMPTR
和IocpListEmpty
:表示清空队列的操作。IOCP_LIST_PUSH
和IocpListPush
:表示向队列中添加数据的操作。IOCP_LIST_POP
和IocpListPop
:表示从队列中取出数据的操作。
2. 数据结构 IOCP_PARAM
typedef struct IocpParam
{
int nOperator; // 操作类型
std::string strData; // 数据
_beginthread_proc_type cbFunc; // 回调函数
IocpParam(int op, const char* sData, _beginthread_proc_type cb = NULL) {
nOperator = op;
strData = sData;
cbFunc = cb;
}
IocpParam() {
nOperator = -1;
}
} IOCP_PARAM;
IOCP_PARAM
是一个结构体,用于封装任务信息:nOperator
:指定操作类型(如IocpListPush
或IocpListPop
)。strData
:存储任务相关的数据。cbFunc
:回调函数指针,用于在任务完成后调用。
3. 线程函数 threadQueueEntry
void threadQueueEntry(HANDLE hIOCP) {
std::list<std::string> lstString; // 线程安全的队列
DWORD dwTransferred = 0;
ULONG_PTR CompletionKey = 0;
OVERLAPPED* poverlapped = NULL;
while (GetQueuedCompletionStatus(hIOCP, &dwTransferred, &CompletionKey, &poverlapped, INFINITE)) {
if ((dwTransferred == 0) && (CompletionKey == NULL)) {
printf("thread is prepare to exit!\r\n");
break;
}
IOCP_PARAM* pParam = (IOCP_PARAM*)CompletionKey;
if (pParam->nOperator == IocpListPush) {
lstString.push_back(pParam->strData);
}
else if (pParam->nOperator == IocpListPop) {
std::string* pStr = NULL;
if (lstString.size() > 0) {
pStr = new std::string(lstString.front());
lstString.pop_front();
}
if (pParam->cbFunc) {
pParam->cbFunc(pStr);
}
}
else if (pParam->nOperator == IocpListEmpty) {
lstString.clear();
}
delete pParam;
}
_endthread();
}
这是线程的入口函数,负责处理从完成端口(IOCP)接收到的任务。
使用
GetQueuedCompletionStatus
函数从完成端口获取任务:如果
dwTransferred
为 0 且CompletionKey
为NULL
,表示线程准备退出。否则,根据
CompletionKey
获取任务参数pParam
。
根据
pParam->nOperator
的值执行不同的操作:IocpListPush
:将数据添加到队列lstString
中。IocpListPop
:从队列中取出数据,并调用回调函数cbFunc
。IocpListEmpty
:清空队列。
最后释放任务参数
pParam
。
4. 回调函数 func
void func(void* arg) {
std::string* pstr = (std::string*)arg;
if (pstr != NULL) {
printf("pop from list :%s\r\n", pstr->c_str());
delete pstr;
}
else {
printf("list is empty,no data!\r\n");
}
}
这是一个简单的回调函数,用于处理从队列中取出的数据:
如果
pstr
不为空,打印数据并释放内存。如果为空,打印队列为空的消息。
5. 主函数 main
int main()
{
if (!CTool::Init()) return 1;
printf("press any key to exit...\r\n");
HANDLE hIOCP = INVALID_HANDLE_VALUE; // I/O 完成端口
hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); // 创建完成端口
HANDLE hThread = (HANDLE)_beginthread(threadQueueEntry, 0, hIOCP); // 创建线程
DWORD tick = GetTickCount64();
while (_kbhit() != 0) { // 检测按键
if (GetTickCount64() - tick > 1300) {
PostQueuedCompletionStatus(hIOCP, sizeof(IOCP_PARAM), (ULONG_PTR)new IOCP_PARAM(IocpListPop, "hello world"), NULL);
}
if (GetTickCount64() - tick > 2000) {
PostQueuedCompletionStatus(hIOCP, sizeof(IOCP_PARAM), (ULONG_PTR)new IOCP_PARAM(IocpListPush, "hello world"), NULL);
tick = GetTickCount64();
}
Sleep(1);
}
if (hIOCP != NULL) {
PostQueuedCompletionStatus(hIOCP, 0, NULL, NULL); // 唤醒完成端口
WaitForSingleObject(hIOCP, INFINITE);
}
CloseHandle(hIOCP);
printf("exit done!\r\n");
::exit(0);
}
主函数的主要逻辑:
初始化:调用
CTool::Init()
进行初始化(假设这是一个工具类)。创建完成端口:使用
CreateIoCompletionPort
创建一个完成端口,指定线程池大小为 1。创建线程:通过
_beginthread
创建一个线程,线程函数为threadQueueEntry
。任务调度:
使用
PostQueuedCompletionStatus
向完成端口提交任务。每隔 1300 毫秒提交一个
IocpListPop
任务。每隔 2000 毫秒提交一个
IocpListPush
任务。
退出逻辑:
按下任意键后,通过
PostQueuedCompletionStatus
提交一个退出信号(dwTransferred
为 0,CompletionKey
为NULL
)。等待完成端口线程退出。
关闭完成端口句柄。
总结
这段代码通过 Windows 的 I/O 完成端口(IOCP)实现了一个简单的任务队列系统。它利用 IOCP 的多线程能力,将任务分发到线程池中执行,并通过一个线程安全的队列管理任务数据。代码的核心在于:
使用
CreateIoCompletionPort
创建完成端口。使用
PostQueuedCompletionStatus
提交任务。在线程中通过
GetQueuedCompletionStatus
获取任务并处理。
这种设计可以高效地处理并发任务,适用于需要高性能 I/O 操作的场景。