#ifndef __THREAD_POOL_H #define __THREAD_POOL_H #include "RakMemoryOverride.h" #include "DS_Queue.h" #include "SimpleMutex.h" #include "Export.h" #include "RakThread.h" #ifdef _MSC_VER #pragma warning( push ) #endif /// A simple class to create worker threads that processes a queue of functions with data. /// This class does not allocate or deallocate memory. It is up to the user to handle memory management. /// InputType and OutputType are stored directly in a queue. For large structures, if you plan to delete from the middle of the queue, /// you might wish to store pointers rather than the structures themselves so the array can shift efficiently. template struct RAK_DLL_EXPORT ThreadPool : public RakNet::RakMemoryOverride { ThreadPool(); ~ThreadPool(); /// Start the specified number of threads. /// \param[in] numThreads The number of threads to start /// \param[in] stackSize 0 for default (except on consoles). /// \param[in] _perThreadDataFactory User callback to return data stored per thread. Pass 0 if not needed. /// \param[in] _perThreadDataDestructor User callback to destroy data stored per thread, created by _perThreadDataFactory. Pass 0 if not needed. /// \return True on success, false on failure. bool StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)()=0, void (*_perThreadDataDestructor)(void*)=0); /// Stops all threads void StopThreads(void); /// Adds a function to a queue with data to pass to that function. This function will be called from the thread /// Memory management is your responsibility! This class does not allocate or deallocate memory. /// The best way to deallocate \a inputData is in userCallback. If you call EndThreads such that callbacks were not called, you /// can iterate through the inputQueue and deallocate all pending input data there /// The best way to deallocate output is as it is returned to you from GetOutput. Similarly, if you end the threads such that /// not all output was returned, you can iterate through outputQueue and deallocate it there. /// \param[in] workerThreadCallback The function to call from the thread /// \param[in] inputData The parameter to pass to \a userCallback void AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData); /// Returns true if output from GetOutput is waiting. /// \return true if output is waiting, false otherwise bool HasOutput(void); /// Inaccurate but fast version of HasOutput. If this returns true, you should still check HasOutput for the real value. /// \return true if output is probably waiting, false otherwise bool HasOutputFast(void); /// Returns true if input from GetInput is waiting. /// \return true if input is waiting, false otherwise bool HasInput(void); /// Inaccurate but fast version of HasInput. If this returns true, you should still check HasInput for the real value. /// \return true if input is probably waiting, false otherwise bool HasInputFast(void); /// Gets the output of a call to \a userCallback /// HasOutput must return true before you call this function. Otherwise it will assert. /// \return The output of \a userCallback. If you have different output signatures, it is up to you to encode the data to indicate this OutputType GetOutput(void); /// Clears internal buffers void Clear(void); /// Lock the input buffer before calling the functions InputSize, InputAtIndex, and RemoveInputAtIndex /// It is only necessary to lock the input or output while the threads are running void LockInput(void); /// Unlock the input buffer after you are done with the functions InputSize, GetInputAtIndex, and RemoveInputAtIndex void UnlockInput(void); /// Length of the input queue unsigned InputSize(void); /// Get the input at a specified index InputType GetInputAtIndex(unsigned index); /// Remove input from a specific index. This does NOT do memory deallocation - it only removes the item from the queue void RemoveInputAtIndex(unsigned index); /// Lock the output buffer before calling the functions OutputSize, OutputAtIndex, and RemoveOutputAtIndex /// It is only necessary to lock the input or output while the threads are running void LockOutput(void); /// Unlock the output buffer after you are done with the functions OutputSize, GetOutputAtIndex, and RemoveOutputAtIndex void UnlockOutput(void); /// Length of the output queue unsigned OutputSize(void); /// Get the output at a specified index OutputType GetOutputAtIndex(unsigned index); /// Remove output from a specific index. This does NOT do memory deallocation - it only removes the item from the queue void RemoveOutputAtIndex(unsigned index); /// Removes all items from the input queue void ClearInput(void); /// Removes all items from the output queue void ClearOutput(void); /// Are any of the threads working, or is input or output available? bool IsWorking(void); /// The number of currently active threads. int NumThreadsWorking(void); /// Have the threads been signaled to be stopped? bool WasStopped(void); protected: // It is valid to cancel input before it is processed. To do so, lock the inputQueue with inputQueueMutex, // Scan the list, and remove the item you don't want. SimpleMutex inputQueueMutex, outputQueueMutex, workingThreadCountMutex, runThreadsMutex; void* (*perThreadDataFactory)(); void (*perThreadDataDestructor)(void*); // inputFunctionQueue & inputQueue are paired arrays so if you delete from one at a particular index you must delete from the other // at the same index DataStructures::Queue inputFunctionQueue; DataStructures::Queue inputQueue; DataStructures::Queue outputQueue; template friend RAK_THREAD_DECLARATION(WorkerThread); /* #ifdef _WIN32 friend unsigned __stdcall WorkerThread( LPVOID arguments ); #else friend void* WorkerThread( void* arguments ); #endif */ /// \internal bool runThreads; /// \internal int numThreadsRunning; /// \internal int numThreadsWorking; /// \internal SimpleMutex numThreadsRunningMutex; #ifdef _WIN32 /// \internal HANDLE quitAndIncomingDataEvents[2]; #endif }; #include "ThreadPool.h" #include "RakSleep.h" #ifdef _WIN32 #else #include #endif #ifdef _MSC_VER #pragma warning(disable:4127) #pragma warning( disable : 4701 ) // potentially uninitialized local variable 'inputData' used #endif template RAK_THREAD_DECLARATION(WorkerThread) /* #ifdef _WIN32 unsigned __stdcall WorkerThread( LPVOID arguments ) #else void* WorkerThread( void* arguments ) #endif */ { bool returnOutput; ThreadPool *threadPool = (ThreadPool*) arguments; ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*); ThreadInputType inputData; ThreadOutputType callbackOutput; userCallback=0; void *perThreadData; if (threadPool->perThreadDataFactory) perThreadData=threadPool->perThreadDataFactory(); else perThreadData=0; // Increase numThreadsRunning threadPool->numThreadsRunningMutex.Lock(); ++threadPool->numThreadsRunning; threadPool->numThreadsRunningMutex.Unlock(); while (1) { #ifdef _WIN32 if (userCallback==0) { // Wait for signaled event WaitForMultipleObjects( 2, threadPool->quitAndIncomingDataEvents, false, INFINITE); } #endif threadPool->runThreadsMutex.Lock(); if (threadPool->runThreads==false) { threadPool->runThreadsMutex.Unlock(); break; } threadPool->runThreadsMutex.Unlock(); threadPool->workingThreadCountMutex.Lock(); ++threadPool->numThreadsWorking; threadPool->workingThreadCountMutex.Unlock(); // Read input data userCallback=0; threadPool->inputQueueMutex.Lock(); if (threadPool->inputFunctionQueue.Size()) { userCallback=threadPool->inputFunctionQueue.Pop(); inputData=threadPool->inputQueue.Pop(); } threadPool->inputQueueMutex.Unlock(); if (userCallback) { callbackOutput=userCallback(inputData, &returnOutput,perThreadData); if (returnOutput) { threadPool->outputQueueMutex.Lock(); threadPool->outputQueue.Push(callbackOutput); threadPool->outputQueueMutex.Unlock(); } } threadPool->workingThreadCountMutex.Lock(); --threadPool->numThreadsWorking; threadPool->workingThreadCountMutex.Unlock(); #ifndef _WIN32 // If no input data available, and GCC, then sleep. if (userCallback==0) RakSleep(1000); #endif } // Decrease numThreadsRunning threadPool->numThreadsRunningMutex.Lock(); --threadPool->numThreadsRunning; threadPool->numThreadsRunningMutex.Unlock(); if (threadPool->perThreadDataDestructor) threadPool->perThreadDataDestructor(perThreadData); return 0; } template ThreadPool::ThreadPool() { runThreads=false; numThreadsRunning=0; } template ThreadPool::~ThreadPool() { StopThreads(); Clear(); } template bool ThreadPool::StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)(), void (*_perThreadDataDestructor)(void *)) { (void) stackSize; runThreadsMutex.Lock(); if (runThreads==true) { runThreadsMutex.Unlock(); return false; } runThreadsMutex.Unlock(); #ifdef _WIN32 quitAndIncomingDataEvents[0]=CreateEvent(0, true, false, 0); quitAndIncomingDataEvents[1]=CreateEvent(0, false, false, 0); #endif perThreadDataFactory=_perThreadDataFactory; perThreadDataDestructor=_perThreadDataDestructor; runThreadsMutex.Lock(); runThreads=true; runThreadsMutex.Unlock(); numThreadsWorking=0; unsigned threadId = 0; (void) threadId; int i; for (i=0; i < numThreads; i++) { int errorCode = RakNet::RakThread::Create(WorkerThread, this); if (errorCode!=0) { StopThreads(); return false; } } // Wait for number of threads running to increase to numThreads bool done=false; while (done==false) { RakSleep(50); numThreadsRunningMutex.Lock(); if (numThreadsRunning==numThreads) done=true; numThreadsRunningMutex.Unlock(); } return true; } template void ThreadPool::StopThreads(void) { runThreadsMutex.Lock(); if (runThreads==false) { runThreadsMutex.Unlock(); return; } runThreads=false; runThreadsMutex.Unlock(); #ifdef _WIN32 // Quit event SetEvent(quitAndIncomingDataEvents[0]); #endif // Wait for number of threads running to decrease to 0 bool done=false; while (done==false) { RakSleep(50); numThreadsRunningMutex.Lock(); if (numThreadsRunning==0) done=true; numThreadsRunningMutex.Unlock(); } #ifdef _WIN32 CloseHandle(quitAndIncomingDataEvents[0]); CloseHandle(quitAndIncomingDataEvents[1]); #endif } template void ThreadPool::AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData) { inputQueueMutex.Lock(); inputQueue.Push(inputData); inputFunctionQueue.Push(workerThreadCallback); inputQueueMutex.Unlock(); #ifdef _WIN32 // Input data event SetEvent(quitAndIncomingDataEvents[1]); #endif } template bool ThreadPool::HasOutputFast(void) { return outputQueue.IsEmpty()==false; } template bool ThreadPool::HasOutput(void) { bool res; outputQueueMutex.Lock(); res=outputQueue.IsEmpty()==false; outputQueueMutex.Unlock(); return res; } template bool ThreadPool::HasInputFast(void) { return inputQueue.IsEmpty()==false; } template bool ThreadPool::HasInput(void) { bool res; inputQueueMutex.Lock(); res=inputQueue.IsEmpty()==false; inputQueueMutex.Unlock(); return res; } template OutputType ThreadPool::GetOutput(void) { // Real output check OutputType output; outputQueueMutex.Lock(); output=outputQueue.Pop(); outputQueueMutex.Unlock(); return output; } template void ThreadPool::Clear(void) { runThreadsMutex.Lock(); if (runThreads) { runThreadsMutex.Unlock(); inputQueueMutex.Lock(); inputFunctionQueue.Clear(); inputQueue.Clear(); inputQueueMutex.Unlock(); outputQueueMutex.Lock(); outputQueue.Clear(); outputQueueMutex.Unlock(); } else { inputFunctionQueue.Clear(); inputQueue.Clear(); outputQueue.Clear(); } } template void ThreadPool::LockInput(void) { inputQueueMutex.Lock(); } template void ThreadPool::UnlockInput(void) { inputQueueMutex.Unlock(); } template unsigned ThreadPool::InputSize(void) { return inputQueue.Size(); } template InputType ThreadPool::GetInputAtIndex(unsigned index) { return inputQueue[index]; } template void ThreadPool::RemoveInputAtIndex(unsigned index) { inputQueue.RemoveAtIndex(index); inputFunctionQueue.RemoveAtIndex(index); } template void ThreadPool::LockOutput(void) { outputQueueMutex.Lock(); } template void ThreadPool::UnlockOutput(void) { outputQueueMutex.Unlock(); } template unsigned ThreadPool::OutputSize(void) { return outputQueue.Size(); } template OutputType ThreadPool::GetOutputAtIndex(unsigned index) { return outputQueue[index]; } template void ThreadPool::RemoveOutputAtIndex(unsigned index) { outputQueue.RemoveAtIndex(index); } template void ThreadPool::ClearInput(void) { inputQueue.Clear(); inputFunctionQueue.Clear(); } template void ThreadPool::ClearOutput(void) { outputQueue.Clear(); } template bool ThreadPool::IsWorking(void) { bool isWorking; // workingThreadCountMutex.Lock(); // isWorking=numThreadsWorking!=0; // workingThreadCountMutex.Unlock(); // if (isWorking) // return true; // Bug fix: Originally the order of these two was reversed. // It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks // here and sees there is no data. So it thinks the thread is not working when it was. if (HasOutputFast() && HasOutput()) return true; if (HasInputFast() && HasInput()) return true; // Need to check is working again, in case the thread was between the first and second checks workingThreadCountMutex.Lock(); isWorking=numThreadsWorking!=0; workingThreadCountMutex.Unlock(); return isWorking; } template int ThreadPool::NumThreadsWorking(void) { return numThreadsWorking; } template bool ThreadPool::WasStopped(void) { bool b; runThreadsMutex.Lock(); b = runThreads; runThreadsMutex.Unlock(); return b; } #ifdef _MSC_VER #pragma warning( pop ) #endif #endif