共计 5697 个字符,预计需要花费 15 分钟才能阅读完成。
前提:
IOCP 的整体编程模型跟上面的纯重叠 io 非常类似. 纯重叠 io 使用 OVERLAPPED + APC 函数完成.
这种模型的缺点是必须让调用 apc 函数进入 alterable 状态. 而 IOCP 解决了这个问题.IOCP 让我们自己创建一些线程,
然后调用 GetQueuedCompletionStatus 来告诉我们某个 io 操作完成, 就像是在另一个线程中执行了 APC 函数一样;
使用 IOCP 的时候, 一般情况下需要自己创建额外的线程, 用于等待结果完成 (GetQueuedCompletionStatus)
使用到的函数:
CreateIoCompletionPort : 创建 / 关联一个完成端口 .
第 3 个参数是一个自定义数据, 第 4 个是最多 N 个线程可被调用;
注意与其关联的 HANDLE 必须要有 OVERLAPPED 属性的
// 创建一个完成端口
HANDLE hComp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)
// 关联到完成端口. 第 3 个参数是一个自定义数据
// 在 GetQueuedCompletionStatus 将携带这些数据返回. 这个自定义数据将一直与此套接字绑定在了一起
CreateIoCompletionPort((HANDLE)client_socket, hComp, (DWORD)pSockData, 0);
GetQueuedCompletionStatus:一旦类似 WSARecv / WSASend 完成后 . 用此函数获取结果, 就想 APC 函数一样, 一旦完成 io 操作就调用. 此函数一般情况都在某一个线程中使用. 注意一旦在某个线程中调用了此函数, 这意味着,
该线程就像被指派给了 IOCP 一样, 供 IOCP 使用. 总之这个行为就想 APC 函数在另一个线程被调用了;
关于解除关联: 一旦一个套接字关闭了 , closehandle /closesocket. 就将从 IOCP 的设备句柄列表中解除关联了
关于线程:
CreateIoCompletionPort 最后一个参数用于指定 IOCP 最多执行 N 个线程 (如果是 0 则使用默认 CPU 的核数). 但一般情况下, 我会预留一些额外的线程. 比如
我的 CPU 是 4 核即 IOCP 最多可使用 4 个线程 , 不过一般情况下会创建 8 个线程, 给 IOCP 预留 额外 4 个线程 . 原因是如果 IOCP
有 5 个任务已经完成, 最多只有 4 个线程被唤醒. 如果其中某个线程调用了 WaitForSingleObject 之类的函数 , 此时 IOCP 将唤醒额外的线程来处理第 5 个任务;
先补充一下. 对于 WSARecv / WSASend 的 OVERLAPPED 操作, 简称为投递操作. 意思是让操作系统去干活, 至于什么时候干完.
GetQueuedCompletionStatus 会通知你 (即返回) . 因此因此, 需要注意, 这些参数像 WSABUF 和 OVERLAPPED 一定要 new / malloc 在堆中;
代码中都有注释: 另代码中有很多返回都没判断. 这个例子仅仅解释如何编写 IOCP
#include “stdafx.h”
#include <process.h>
#include “../utils.h” // 包含了一些宏和一些打印错误信息的函数.
#define BUFFSIZE 8192
#define Read 0
#define Write 1
// 自定义数据 . 注意 结构的地址 与 第一个成员的地址相同
struct IOData
{
WSAOVERLAPPED overlapped; // 每个 io 操作都需要独立的一个 overlapped
WSABUF wsabuf; // 读写各一份
int rw_mode; // 判断读写操作
char * buf; // 真正存放数据的地方, 需要初始化
};
// 自定义数据. 保存客户套接字和地址
struct SocketData
{
SOCKET hClientSocket; // 客户端套接字
SOCKADDR_IN clientAddr;
IOData * pRead; // 2 个指针, 只是为了在线程中方便使用添加的
IOData * pWrite;
};
// 用于交换 2 个 buf
int swapBuf(WSABUF * a, WSABUF * b)
{
BOOL ret = FALSE;
if (a && b){
char * buf = a->buf;
a->buf = b->buf;
b->buf = buf;
ret = TRUE;
}
return ret;
}
// 释放内存, 解除关联
void freeMem(SocketData * pSockData)
{
closesocket(pSockData->hClientSocket);
free(pSockData->pRead->buf);
free(pSockData->pWrite->buf);
free(pSockData->pWrite);
free(pSockData->pRead);
free(pSockData);
}
unsigned int WINAPI completeRoutine(void * param)
{
// 完成端口
HANDLE hCom = (HANDLE)param;
SocketData * pSockData = NULL;
IOData * pIOData = NULL;
DWORD flags = 0, bytes = 0;
BOOL ret = 0;
SOCKET hClientSocket = NULL;
printf(“tid:%ld start!\n”, GetCurrentThreadId());
while (1)
{
flags = 0;
// 直到有任务完成即返回
ret = GetQueuedCompletionStatus(hCom, &bytes,
(PULONG_PTR)&pSockData,
(LPOVERLAPPED *)&pIOData,
INFINITE);
printf(“GetQueuedCompletionStatus : %d , diy key : %p , pIOData:%p,mode:%d\n”, ret, pSockData,
pIOData,pIOData->rw_mode);
// 如果成功了
if (ret)
{
hClientSocket = pSockData->hClientSocket;
// 如果是 WSARecv 的
if (Read == pIOData->rw_mode)
{
printf(“READ – > bytesRecved:%ld, high:%ld\n”, bytes, pIOData->overlapped.InternalHigh);
// 对端关闭
if (0 == bytes)
{
printf(“peer closed\n”);
freeMem(pSockData); // 释放内存
continue;
}
// 测试数据
pSockData->pRead->buf[bytes] = 0;
printf(“Read buf:%s\n”, pSockData->pRead->buf);
// 交换指针, 把 recv 的 buf 给 write 的 buf;
// 把 write 的 buf 交换给 recv . 如果并发量不大的时候可以这么做
swapBuf(&pIOData->wsabuf, &pSockData->pWrite->wsabuf);
// 回传操作. 清空 write OVERLAPPED
memset(&pSockData->pWrite->overlapped, 0, sizeof(WSAOVERLAPPED));
pSockData->pWrite->wsabuf.len = bytes;
WSASend(hClientSocket, &pSockData->pWrite->wsabuf,
1, NULL, 0, &pSockData->pWrite->overlapped, NULL);
// 再次投递一个 recv 操作, 等待下次客户端发送
memset(&pSockData->pRead->overlapped, 0, sizeof(WSAOVERLAPPED));
pSockData->pRead->wsabuf.len = BUFFSIZE;
WSARecv(hClientSocket, &pSockData->pRead->wsabuf, 1, NULL, &flags,
&pSockData->pRead->overlapped, NULL);
}
else {
// send 完成.
printf(“Send finsished – > bytes:%ld, high:%ld\n”, bytes, pIOData->overlapped.InternalHigh);
memset(&pIOData->overlapped, 0, sizeof(WSAOVERLAPPED));
}
}
else{
// 一旦出错, 解除绑定即删除内存
print_error(GetLastError());
freeMem(pSockData);
}
}
return 0;
}
int _tmain(int argc, _TCHAR* argv[])
{
WSADATA wsadata;
if (WSAStartup(MAKEWORD(2, 2), &wsadata) != 0){
print_error(WSAGetLastError());
return 0;
}
SYSTEM_INFO sysinfo;
GetSystemInfo(&sysinfo);
// 指定线程数量. 一般 processors * 2
const DWORD nThreads = sysinfo.dwNumberOfProcessors * 2;
// 创建一个完成端口 , 前 3 个参数保证了创建一个独立的完成端口, 最后一个参数指定了完成
// 端口可使用的线程数. 0 使用当前 cpu 核数
HANDLE hCom = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// 准备一些线程供完成端口调用, 把完成端口同时传入
HANDLE * arr_threads = new HANDLE[nThreads];
for (int i = 0; i < sysinfo.dwNumberOfProcessors; ++i)
arr_threads[i] = (HANDLE)_beginthreadex(NULL, 0, completeRoutine, (void*)hCom, 0, NULL);
// 创建一个支持 OVERLAPPED 的 socket. 这样的属性将被 accept 返回的 socket 所继承
SOCKET hListenSocket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
SOCKADDR_IN serv_addr, client_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);
serv_addr.sin_addr.s_addr = INADDR_ANY;
bind(hListenSocket, (SOCKADDR*)&serv_addr, sizeof(serv_addr));
listen(hListenSocket, BACKLOG);
SOCKET client_socket;
int client_addr_size = 0;
DWORD flags = 0;
while (1){
client_addr_size = sizeof(client_addr);
flags = 0;
client_socket = accept(hListenSocket, (SOCKADDR*)&client_addr, &client_addr_size);
puts(“accepted”);
// 准备一份数据, 用于保存 clientsocket, addr, 以及读写指针;
SocketData * pSockData = (SocketData *)malloc(sizeof(SocketData));
pSockData->pRead = NULL;
pSockData->pWrite = NULL;
pSockData->hClientSocket = client_socket;
memcpy(&pSockData->clientAddr, &client_addr, client_addr_size);
// 准备数据
IOData * pRead = (IOData *)malloc(sizeof(IOData));
// 对于 OVERLAPPED, 需要额外注意, 清 0
memset(&pRead->overlapped, 0, sizeof(WSAOVERLAPPED));
pRead->buf = (char *)malloc(BUFFSIZE);
pRead->rw_mode = Read;
pRead->wsabuf.buf = pRead->buf;
pRead->wsabuf.len = BUFFSIZE;
pSockData->pRead = pRead;
IOData *pWrite = (IOData *)malloc(sizeof(IOData));
pWrite->buf = (char *)malloc(BUFFSIZE);
memset(&pWrite->overlapped, 0, sizeof(WSAOVERLAPPED));
pWrite->rw_mode = Write;
pWrite->wsabuf.buf = pWrite->buf;
pWrite->wsabuf.len = BUFFSIZE;
pSockData->pWrite = pWrite;
// 与 iocp 关联在一起. 注意第 3 个参数, 把自定义数据一起传递过去
CreateIoCompletionPort((HANDLE)client_socket, hCom, (DWORD)pSockData, 0);
WSARecv(client_socket, &pRead->wsabuf, 1, NULL, &flags, &pRead->overlapped, NULL);
}
return 0;
}