VisualStudio/C++서버

[C++서버] IOCP(Completion Port) 모델

usingsystem 2024. 5. 24. 12:59
728x90

Overlapped 콜백기반은 비동기 입출력 함수가 완료가 되면 스레드마다 있는 APC 큐에 일감이 쌓인다.

Alertable Wait 상태로 들어가면 APC큐를 전체 비운다. (콜백 함수 호출)

APC큐는 쓰레드마다있기 때문에 멀티 스레드 환경에서 적절하게 배분하는데 어려움이 있다.( Alertable Wait 상태로 들어가 APC큐를 비워야 하기 때문이다.)  또 한 Alertable Wait 계속 호출하는 부담이 있다.

 

Overlapped  이벤트 방식으로는 소켓과 이벤트를 1대 1로 대응해야 하고 그리고 감시할 수 있는 수량도 64개 밖에 되지 않기 때문에 많은 수의 이벤트를 관찰하기 어렵다. 

 

이런 문제를 보안하기 위해IOPC(Completion Port) 방식이 나오게 된다.

 

IOPC모델은 Overapped 콜백 기반과 유사한 형식으로 구성된다.

APC큐 대신 Completion Port를 대신 사용하며 APC와 다르게 쓰레드마다 1개가 존재하지 않고 오직 하나만 만들어 중앙에서 Completion Port 가 일감을 관리하게 된다. 그렇기 때문에 IOCP 모델은 멀티 쓰레드 환경과 궁합이 매우좋다.

 

Alertable Wait 대신에 Completion Port 결과 처리를 GetQueuedCompletionStatus로 대체하여 사용한다.

 

Completion Port생성과 관찰 등록

CreateIoCompletionPort

 

Completion Port 결과처리 감시

GetQueuedCompletionStatus

동작 흐름

  1. 포트 생성 및 연결:(2가지 활용)
    • 서버 애플리케이션은 CreateIoCompletionPort를 사용하여 I/O 완료 포트를 생성합니다.
    • 각 클라이언트 소켓을 이 포트에 연결합니다.
  2. 비동기 I/O 작업 시작:
    • 서버는 ReadFile, WriteFile 또는 WSARecv, WSASend와 같은 비동기 I/O 함수를 호출합니다.
    • 이 함수들은 OVERLAPPED 구조체를 사용하여 비동기 I/O 작업을 수행합니다.
  3. I/O 작업 완료 대기:
    • 서버는 작업자 스레드에서 GetQueuedCompletionStatus를 호출하여 I/O 완료 포트에서 완료된 I/O 작업을 기다립니다.
  4. 작업 완료 처리:
    • I/O 작업이 완료되면, GetQueuedCompletionStatus는 완료된 작업의 바이트 수, CompletionKey, OVERLAPPED 구조체를 반환합니다.
    • 서버는 이 정보를 사용하여 완료된 I/O 작업을 처리합니다.
#include "pch.h"
#include <iostream>
#include "CorePch.h"
#include <atomic>
#include <mutex>
#include <windows.h>
#include <future>
#include "ThreadManager.h"

#include <WinSock2.h>
#include <mswsock.h>
#include <ws2tcpip.h>
#pragma comment(lib, "ws2_32.lib")

void HandleError(const char* cause)
{
	int32 errCode = ::WSAGetLastError();
	cout << cause << " ErrorCode" << errCode << endl;
}

const int32 BUFSIZE = 1000;
struct Session
{
	SOCKET socket;
	char recvBuffer[BUFSIZE] = {};
	int32 recvBytes = 0;
};

enum IO_TYPE
{
	READ,
	WRITE,
	ACCEPT,
	CONNECT,
};

struct OverlappedEx
{
	WSAOVERLAPPED overlapped = {};
	int32 type = 0;
};

void WorkerThreadMain(HANDLE iocpHandle)
{
	while (true)
	{
		DWORD bytesTransferred = 0;
		Session* session = nullptr;
		OverlappedEx* overlappedEx = nullptr;

		//이쪽에서 대기 성공하면 밑으로 내려감.
		BOOL ret = ::GetQueuedCompletionStatus(iocpHandle, &bytesTransferred, (ULONG_PTR*)&session, (LPOVERLAPPED*)&overlappedEx, INFINITE);

		if (ret = FALSE || bytesTransferred == 0)
		{
			//TODO : 연결끊김
			continue;
		}

		ASSERT_CRASH(overlappedEx->type == IO_TYPE::READ);
		cout << "Recv Data IOCP =" << bytesTransferred << endl;

		WSABUF wsaBuf;
		wsaBuf.buf = session->recvBuffer;
		wsaBuf.len = BUFSIZE;

		DWORD recvLen = 0;
		DWORD flags = 0;
		//다시 RECV 예약!
		::WSARecv(session->socket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL);
	}
}

int main()
{
	//윈소켓 초기화(w2_32 라이브러리 초기화)
	//관련정보가 wsaData에 채워짐
	WSADATA wsaData;
	if (::WSAStartup(MAKEWORD(2, 2), &wsaData) != 0)
		return 0;

	SOCKET listenSocket = ::socket(AF_INET, SOCK_STREAM, 0);
	if (listenSocket == INVALID_SOCKET)
		return 0;

	//나의 주소 : IP주소 + PORT
	SOCKADDR_IN serverAddr;//IPv4
	::memset(&serverAddr, 0, sizeof(serverAddr));
	serverAddr.sin_family = AF_INET;
	serverAddr.sin_addr.s_addr = ::htonl(INADDR_ANY);//아이피 알아서 설정
	serverAddr.sin_port = ::htons(7777);//PORT

	//socket과 주소 연동
	if (::bind(listenSocket, (SOCKADDR*)&serverAddr, sizeof(serverAddr)) == SOCKET_ERROR)
		return 0;

	//서버오픈
	if (::listen(listenSocket, 10) == SOCKET_ERROR)//10은 대기열 이걸넘으면 못들어옴
		return 0;

	cout << "Accept" << endl;

	vector<Session*> sessionManager;

	//CompletionPort 생성
	HANDLE iocpHandle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

	//WorkerThreads 생성 CompletionPort를 관찰하면서 완료된 입출력 함수를 처리한다.(recv 전용)
	for (int32 i = 0; i < 5; i++)
	{
		GthreadManager->Launch([=]()
			{
				WorkerThreadMain(iocpHandle);
			});
	}

	//Main Thread = Accept 담당
	while (true)
	{
		SOCKADDR_IN clientAddr;
		int32 addrLen = sizeof(clientAddr);

		SOCKET	clientSocket = ::accept(listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
		if (clientSocket == INVALID_SOCKET)
			return 0;

		Session* session = new Session();
		session->socket = clientSocket;
		sessionManager.push_back(session);

		cout << "client connected" << endl;

		//소켓을 cp에 등록 관찰시작
		::CreateIoCompletionPort((HANDLE)clientSocket, iocpHandle,  /*key*/(ULONG_PTR)session, 0);

		WSABUF wsaBuf;
		wsaBuf.buf = session->recvBuffer;
		wsaBuf.len = BUFSIZE;

		OverlappedEx* overlappedEx = new OverlappedEx();
		overlappedEx->type = IO_TYPE::READ;

		DWORD recvLen = 0;
		DWORD flags = 0;
		::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL);//메인스레드에서 한번 호출 하고 다른 스레드에서 관리. 메인스레드는 다시 accept호출 하로감.
	}

	GthreadManager->Join();
	::WSACleanup();
}

 

::WSARecv(clientSocket, &wsaBuf, 1, &recvLen, &flags, &overlappedEx->overlapped, NULL);

는 외부에서 해당 session이나 overlapped를 삭제하면 안된다. 주소가 사라지기 때문에

728x90