[강의정리] 쓰레드 풀
=> 필요성
- 네트워크 프로그래밍에서 사용자가 늘어날 때마다 쓰레드를 만들 수는 없다.
어떤 프로그램을 만들 때 몇개의 쓰레드를 만들 것인가? 에 대한 설계가 필요
[소스]
#include "StdAfx.h"
#include "ThreadQueuePool.h"
#include
CThreadQueuePool::CThreadQueuePool(void)
{
m_hQueueSemaphore = NULL;
m_hStopEvent = NULL;
m_phThreads = NULL;
m_dwSize = 0;
InitializeCriticalSection(&CS);
}
CThreadQueuePool::~CThreadQueuePool(void)
{
if (NULL != m_phThreads)
{
delete m_phThreads;
m_phThreads = NULL;
}
DeleteCriticalSection(&CS);
}
DWORD CThreadQueuePool::Create(int nSize)
{
if (0 >= nSize)
{
return ERROR_INVALID_PARAMETER;
}
DWORD dwRet = ERROR_SUCCESS;
m_dwSize = nSize;
//lInitialCount Value is 0
m_hQueueSemaphore = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
if (NULL == m_hQueueSemaphore)
{
dwRet = ERROR_CAN_NOT_COMPLETE;
goto QUIT;
}
//bManualReset Value is TRUE
m_hStopEvent = CreateEvent(NULL, TRUE, FALSE, NULL );
if (NULL == m_hStopEvent)
{
dwRet = ERROR_CAN_NOT_COMPLETE;
goto QUIT;
}
//Allocate Thread Handle
m_phThreads = new HANDLE[nSize];
for (int i = 0; i < nSize; i++)
{
// create thread
UINT uiThreadId = 0;
m_phThreads[i] = (HANDLE)_beginthreadex(NULL,
0,
CThreadQueuePool::ThreadFunc,
this,
0,
&uiThreadId);
if (NULL == m_phThreads[i])
{
dwRet = ERROR_CAN_NOT_COMPLETE;
m_dwSize = i;
StopThread();
goto QUIT;
}
}
QUIT:
if (ERROR_SUCCESS != dwRet)
{
if (NULL != m_hQueueSemaphore)
{
CloseHandle(m_hQueueSemaphore);
m_hQueueSemaphore = NULL;
}
if (NULL != m_hStopEvent)
{
CloseHandle(m_hStopEvent);
m_hStopEvent = NULL;
}
}
return dwRet;
}
DWORD CThreadQueuePool::InsertItem(DWORD dwData)
{
DWORD dwRet = ERROR_SUCCESS;
{
EnterCriticalSection(&CS);
m_aryItemQueue.Add(dwData);
LeaveCriticalSection(&CS);
}
if (FALSE == ReleaseSemaphore(m_hQueueSemaphore, 1, NULL))
{
dwRet = ERROR_CAN_NOT_COMPLETE;
ASSERT(FALSE);
}
return dwRet;
}
DWORD CThreadQueuePool::StopThread(void)
{
if (NULL == m_phThreads)
return ERROR_SUCCESS;
DWORD dwReturn = ERROR_SUCCESS;
SetEvent(m_hStopEvent);
dwReturn = WaitForMultipleObjects(m_dwSize, m_phThreads, TRUE, INFINITE);
if (WAIT_OBJECT_0 == dwReturn)
{
AfxTrace(TEXT("All Thread terminate \r\n"));
delete m_phThreads;
m_phThreads = NULL;
}
else
{
ASSERT(FALSE);
}
return dwReturn;
}
unsigned int __stdcall CThreadQueuePool::ThreadFunc(void* pParam)
{
//You must check pParam Value...
HANDLE hObj[INDEX_END] = {0,};
DWORD dwResult = 0;
CThreadQueuePool* pThis = (CThreadQueuePool*)pParam;
BOOL bGo = TRUE;
LPVOID lpWorkItem = NULL;
DWORD dwRet = 0;
DWORD dwCount = 0;
hObj[INDEX_QUEUE] = pThis->m_hQueueSemaphore;
hObj[INDEX_STOP] = pThis->m_hStopEvent;
while(bGo)
{
//This Thread Wait Two Handle
//bWaitAll value is FALSE
//dwMilliseconds(WaitTime) value is INFINITE
dwResult = WaitForMultipleObjects(INDEX_END, hObj, FALSE, INFINITE);
switch(dwResult)
{
// hObj[INDEX_QUEUE] was signaled.
case WAIT_OBJECT_0 + INDEX_QUEUE:
//AfxTrace(TEXT("[%d] Semaphore signaled and then Thread Excute \r\n"), GetCurrentThreadId());
dwCount++;
{
EnterCriticalSection(&pThis->CS);
dwRet = pThis->GetWorkItem();
LeaveCriticalSection(&pThis->CS);
}
//Doing WorkItem
//Sleep(1000);
break;
// hObj[INDEX_STOP] was signaled.
case WAIT_OBJECT_0 + INDEX_STOP:
bGo = FALSE;
break;
default:
ASSERT(FALSE);
break;
}
}
AfxTrace(TEXT("[%d] Semaphore Use Thread Pool Data, Count[%d, %d] \r\n"), GetCurrentThreadId(), dwRet, dwCount);
return 1;
}
DWORD CThreadQueuePool::GetWorkItem(void)
{
DWORD dwRet = 0;
if (0 < m_aryItemQueue.GetSize())
{
//Synchronization
dwRet = m_aryItemQueue[0];
m_aryItemQueue.RemoveAt(0);
}
return dwRet;
}
-------------------------------------------------------------------------------------
=> 세마포어를 이용하여 Queue방식으로 쓴다. 세마포어 개념 또한 Count개념이므로...
=> 동기화는 Critical Section으로써 데이터를 넣고 빼고한다.
=> LONG_MAX(소스 참조)만큼 세마포어를 만든다.
=> Insert부분에서 Queue에 Critical Section으로 동기화하고 다른 쓰레드에 쓰라고 통보(ReleaseSemaphore)해준다.
=> 생성한 5개의 쓰레드중에 하나가 꺠어나서 Queue의 데이터를 꺼내서 처리한다.