프로그래밍 세계/Windows Programming

[강의정리] 쓰레드 풀

깡다구짱 2008. 10. 3. 22:22
출처 : 신경준씨 사외교육 http://blog.naver.com/process3/20052147545


=> 필요성

 

  - 네트워크 프로그래밍에서 사용자가 늘어날 때마다 쓰레드를 만들 수는 없다.

    어떤 프로그램을 만들 때 몇개의 쓰레드를 만들 것인가? 에 대한 설계가 필요

 

[소스]

 

#include "StdAfx.h"
#include "ThreadQueuePool.h"
#include

CThreadQueuePool::CThreadQueuePool(void)
{
 m_hQueueSemaphore = NULL;
 m_hStopEvent = NULL; 
 m_phThreads = NULL;
 m_dwSize = 0;

 InitializeCriticalSection(&CS);

}

CThreadQueuePool::~CThreadQueuePool(void)
{
 if (NULL != m_phThreads)
 {
  delete m_phThreads;
  m_phThreads = NULL;
 }

 DeleteCriticalSection(&CS);

}

DWORD CThreadQueuePool::Create(int nSize)
{
 if (0 >= nSize)
 {
  return ERROR_INVALID_PARAMETER;
 }

 DWORD dwRet = ERROR_SUCCESS;

 m_dwSize = nSize;

 //lInitialCount Value is 0
 m_hQueueSemaphore = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
 if (NULL == m_hQueueSemaphore)
 {
  dwRet = ERROR_CAN_NOT_COMPLETE;
  goto QUIT;
 }
 
 //bManualReset Value is TRUE
 m_hStopEvent = CreateEvent(NULL, TRUE, FALSE, NULL );
 if (NULL == m_hStopEvent)
 {
  dwRet = ERROR_CAN_NOT_COMPLETE;
  goto QUIT;
 }

 //Allocate Thread Handle
 m_phThreads = new HANDLE[nSize];

 for (int i = 0; i < nSize; i++)
 {
  // create thread
  UINT uiThreadId = 0;

  m_phThreads[i] = (HANDLE)_beginthreadex(NULL,
   0,
   CThreadQueuePool::ThreadFunc,
   this,
   0,
   &uiThreadId);

  if (NULL == m_phThreads[i])
  {
   dwRet = ERROR_CAN_NOT_COMPLETE;
   m_dwSize = i;
   StopThread();
   goto QUIT;
  }

 }

QUIT:

 if (ERROR_SUCCESS != dwRet)
 {
  if (NULL != m_hQueueSemaphore)
  {
   CloseHandle(m_hQueueSemaphore);
   m_hQueueSemaphore = NULL;
  }

  if (NULL != m_hStopEvent)
  {
   CloseHandle(m_hStopEvent);
   m_hStopEvent = NULL;
  }
 }

 return dwRet;
}

DWORD CThreadQueuePool::InsertItem(DWORD dwData)
{
 DWORD dwRet = ERROR_SUCCESS;

 {
  EnterCriticalSection(&CS);
  m_aryItemQueue.Add(dwData);
  LeaveCriticalSection(&CS);

 }

 if (FALSE == ReleaseSemaphore(m_hQueueSemaphore, 1, NULL))
 {
  dwRet = ERROR_CAN_NOT_COMPLETE;
  ASSERT(FALSE);
 }

 return dwRet;
}

DWORD CThreadQueuePool::StopThread(void)
{
 if (NULL == m_phThreads)
  return ERROR_SUCCESS;

 DWORD dwReturn = ERROR_SUCCESS;

 SetEvent(m_hStopEvent); 
 dwReturn = WaitForMultipleObjects(m_dwSize, m_phThreads, TRUE, INFINITE);

 if (WAIT_OBJECT_0 == dwReturn)
 {
  AfxTrace(TEXT("All Thread terminate \r\n"));
  delete m_phThreads;
  m_phThreads = NULL;
 }
 else
 {
  ASSERT(FALSE);
 }

 return dwReturn;
}

unsigned int __stdcall CThreadQueuePool::ThreadFunc(void* pParam)
{
 //You must check pParam Value...

 HANDLE hObj[INDEX_END] = {0,};
 DWORD dwResult = 0;
 CThreadQueuePool* pThis = (CThreadQueuePool*)pParam;
 BOOL bGo = TRUE;
 LPVOID lpWorkItem = NULL;
 DWORD dwRet = 0;
 DWORD dwCount = 0;


 hObj[INDEX_QUEUE] = pThis->m_hQueueSemaphore;
 hObj[INDEX_STOP] = pThis->m_hStopEvent;

 while(bGo)
 {
  //This Thread Wait Two Handle
  //bWaitAll value is FALSE
  //dwMilliseconds(WaitTime) value is INFINITE
  dwResult = WaitForMultipleObjects(INDEX_END, hObj, FALSE, INFINITE);

  switch(dwResult)
  {
   // hObj[INDEX_QUEUE] was signaled.
  case WAIT_OBJECT_0 + INDEX_QUEUE:
   //AfxTrace(TEXT("[%d] Semaphore signaled and then Thread Excute \r\n"), GetCurrentThreadId());

   dwCount++;

   {
    EnterCriticalSection(&pThis->CS);
    dwRet = pThis->GetWorkItem();    
    LeaveCriticalSection(&pThis->CS);    

   }
   
   //Doing WorkItem
   
   //Sleep(1000);
   break;

   // hObj[INDEX_STOP] was signaled.
  case WAIT_OBJECT_0 + INDEX_STOP:
   bGo = FALSE;
   break;

  default:
   ASSERT(FALSE);
   break;
  }

 }

 AfxTrace(TEXT("[%d] Semaphore Use Thread Pool Data, Count[%d, %d] \r\n"), GetCurrentThreadId(), dwRet, dwCount);
 
 return 1;
}
DWORD CThreadQueuePool::GetWorkItem(void)
{
 DWORD dwRet = 0;

 if (0 < m_aryItemQueue.GetSize())
 {
  //Synchronization
  dwRet = m_aryItemQueue[0];
  m_aryItemQueue.RemoveAt(0);
 }

 return dwRet;
}

-------------------------------------------------------------------------------------

 

=> 세마포어를 이용하여 Queue방식으로 쓴다. 세마포어 개념 또한 Count개념이므로...

 

=> 동기화는 Critical Section으로써 데이터를 넣고 빼고한다.

 

=> LONG_MAX(소스 참조)만큼 세마포어를 만든다.

 

=> Insert부분에서 Queue에 Critical Section으로 동기화하고 다른 쓰레드에 쓰라고 통보(ReleaseSemaphore)해준다.

 

=> 생성한 5개의 쓰레드중에 하나가 꺠어나서 Queue의 데이터를 꺼내서 처리한다.