DarkflameServer/thirdparty/raknet/Source/ThreadPool.h

552 lines
16 KiB
C
Raw Permalink Normal View History

#ifndef __THREAD_POOL_H
#define __THREAD_POOL_H
#include "RakMemoryOverride.h"
#include "DS_Queue.h"
#include "SimpleMutex.h"
#include "Export.h"
#include "RakThread.h"
#ifdef _MSC_VER
#pragma warning( push )
#endif
/// A simple class to create worker threads that processes a queue of functions with data.
/// This class does not allocate or deallocate memory. It is up to the user to handle memory management.
/// InputType and OutputType are stored directly in a queue. For large structures, if you plan to delete from the middle of the queue,
/// you might wish to store pointers rather than the structures themselves so the array can shift efficiently.
template <class InputType, class OutputType>
struct RAK_DLL_EXPORT ThreadPool : public RakNet::RakMemoryOverride
{
ThreadPool();
~ThreadPool();
/// Start the specified number of threads.
/// \param[in] numThreads The number of threads to start
/// \param[in] stackSize 0 for default (except on consoles).
/// \param[in] _perThreadDataFactory User callback to return data stored per thread. Pass 0 if not needed.
/// \param[in] _perThreadDataDestructor User callback to destroy data stored per thread, created by _perThreadDataFactory. Pass 0 if not needed.
/// \return True on success, false on failure.
bool StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)()=0, void (*_perThreadDataDestructor)(void*)=0);
/// Stops all threads
void StopThreads(void);
/// Adds a function to a queue with data to pass to that function. This function will be called from the thread
/// Memory management is your responsibility! This class does not allocate or deallocate memory.
/// The best way to deallocate \a inputData is in userCallback. If you call EndThreads such that callbacks were not called, you
/// can iterate through the inputQueue and deallocate all pending input data there
/// The best way to deallocate output is as it is returned to you from GetOutput. Similarly, if you end the threads such that
/// not all output was returned, you can iterate through outputQueue and deallocate it there.
/// \param[in] workerThreadCallback The function to call from the thread
/// \param[in] inputData The parameter to pass to \a userCallback
void AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData);
/// Returns true if output from GetOutput is waiting.
/// \return true if output is waiting, false otherwise
bool HasOutput(void);
/// Inaccurate but fast version of HasOutput. If this returns true, you should still check HasOutput for the real value.
/// \return true if output is probably waiting, false otherwise
bool HasOutputFast(void);
/// Returns true if input from GetInput is waiting.
/// \return true if input is waiting, false otherwise
bool HasInput(void);
/// Inaccurate but fast version of HasInput. If this returns true, you should still check HasInput for the real value.
/// \return true if input is probably waiting, false otherwise
bool HasInputFast(void);
/// Gets the output of a call to \a userCallback
/// HasOutput must return true before you call this function. Otherwise it will assert.
/// \return The output of \a userCallback. If you have different output signatures, it is up to you to encode the data to indicate this
OutputType GetOutput(void);
/// Clears internal buffers
void Clear(void);
/// Lock the input buffer before calling the functions InputSize, InputAtIndex, and RemoveInputAtIndex
/// It is only necessary to lock the input or output while the threads are running
void LockInput(void);
/// Unlock the input buffer after you are done with the functions InputSize, GetInputAtIndex, and RemoveInputAtIndex
void UnlockInput(void);
/// Length of the input queue
unsigned InputSize(void);
/// Get the input at a specified index
InputType GetInputAtIndex(unsigned index);
/// Remove input from a specific index. This does NOT do memory deallocation - it only removes the item from the queue
void RemoveInputAtIndex(unsigned index);
/// Lock the output buffer before calling the functions OutputSize, OutputAtIndex, and RemoveOutputAtIndex
/// It is only necessary to lock the input or output while the threads are running
void LockOutput(void);
/// Unlock the output buffer after you are done with the functions OutputSize, GetOutputAtIndex, and RemoveOutputAtIndex
void UnlockOutput(void);
/// Length of the output queue
unsigned OutputSize(void);
/// Get the output at a specified index
OutputType GetOutputAtIndex(unsigned index);
/// Remove output from a specific index. This does NOT do memory deallocation - it only removes the item from the queue
void RemoveOutputAtIndex(unsigned index);
/// Removes all items from the input queue
void ClearInput(void);
/// Removes all items from the output queue
void ClearOutput(void);
/// Are any of the threads working, or is input or output available?
bool IsWorking(void);
/// The number of currently active threads.
int NumThreadsWorking(void);
/// Have the threads been signaled to be stopped?
bool WasStopped(void);
protected:
// It is valid to cancel input before it is processed. To do so, lock the inputQueue with inputQueueMutex,
// Scan the list, and remove the item you don't want.
SimpleMutex inputQueueMutex, outputQueueMutex, workingThreadCountMutex, runThreadsMutex;
void* (*perThreadDataFactory)();
void (*perThreadDataDestructor)(void*);
// inputFunctionQueue & inputQueue are paired arrays so if you delete from one at a particular index you must delete from the other
// at the same index
DataStructures::Queue<OutputType (*)(InputType, bool *, void*)> inputFunctionQueue;
DataStructures::Queue<InputType> inputQueue;
DataStructures::Queue<OutputType> outputQueue;
template <class ThreadInputType, class ThreadOutputType>
friend RAK_THREAD_DECLARATION(WorkerThread);
/*
#ifdef _WIN32
friend unsigned __stdcall WorkerThread( LPVOID arguments );
#else
friend void* WorkerThread( void* arguments );
#endif
*/
/// \internal
bool runThreads;
/// \internal
int numThreadsRunning;
/// \internal
int numThreadsWorking;
/// \internal
SimpleMutex numThreadsRunningMutex;
#ifdef _WIN32
/// \internal
HANDLE quitAndIncomingDataEvents[2];
#endif
};
#include "ThreadPool.h"
#include "RakSleep.h"
#ifdef _WIN32
#else
#include <unistd.h>
#endif
#ifdef _MSC_VER
#pragma warning(disable:4127)
#pragma warning( disable : 4701 ) // potentially uninitialized local variable 'inputData' used
#endif
template <class ThreadInputType, class ThreadOutputType>
RAK_THREAD_DECLARATION(WorkerThread)
/*
#ifdef _WIN32
unsigned __stdcall WorkerThread( LPVOID arguments )
#else
void* WorkerThread( void* arguments )
#endif
*/
{
bool returnOutput;
ThreadPool<ThreadInputType, ThreadOutputType> *threadPool = (ThreadPool<ThreadInputType, ThreadOutputType>*) arguments;
ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*);
ThreadInputType inputData;
ThreadOutputType callbackOutput;
userCallback=0;
void *perThreadData;
if (threadPool->perThreadDataFactory)
perThreadData=threadPool->perThreadDataFactory();
else
perThreadData=0;
// Increase numThreadsRunning
threadPool->numThreadsRunningMutex.Lock();
++threadPool->numThreadsRunning;
threadPool->numThreadsRunningMutex.Unlock();
while (1)
{
#ifdef _WIN32
if (userCallback==0)
{
// Wait for signaled event
WaitForMultipleObjects(
2,
threadPool->quitAndIncomingDataEvents,
false,
INFINITE);
}
#endif
threadPool->runThreadsMutex.Lock();
if (threadPool->runThreads==false)
{
threadPool->runThreadsMutex.Unlock();
break;
}
threadPool->runThreadsMutex.Unlock();
threadPool->workingThreadCountMutex.Lock();
++threadPool->numThreadsWorking;
threadPool->workingThreadCountMutex.Unlock();
// Read input data
userCallback=0;
threadPool->inputQueueMutex.Lock();
if (threadPool->inputFunctionQueue.Size())
{
userCallback=threadPool->inputFunctionQueue.Pop();
inputData=threadPool->inputQueue.Pop();
}
threadPool->inputQueueMutex.Unlock();
if (userCallback)
{
callbackOutput=userCallback(inputData, &returnOutput,perThreadData);
if (returnOutput)
{
threadPool->outputQueueMutex.Lock();
threadPool->outputQueue.Push(callbackOutput);
threadPool->outputQueueMutex.Unlock();
}
}
threadPool->workingThreadCountMutex.Lock();
--threadPool->numThreadsWorking;
threadPool->workingThreadCountMutex.Unlock();
#ifndef _WIN32
// If no input data available, and GCC, then sleep.
if (userCallback==0)
RakSleep(1000);
#endif
}
// Decrease numThreadsRunning
threadPool->numThreadsRunningMutex.Lock();
--threadPool->numThreadsRunning;
threadPool->numThreadsRunningMutex.Unlock();
if (threadPool->perThreadDataDestructor)
threadPool->perThreadDataDestructor(perThreadData);
return 0;
}
template <class InputType, class OutputType>
ThreadPool<InputType, OutputType>::ThreadPool()
{
runThreads=false;
numThreadsRunning=0;
}
template <class InputType, class OutputType>
ThreadPool<InputType, OutputType>::~ThreadPool()
{
StopThreads();
Clear();
}
template <class InputType, class OutputType>
bool ThreadPool<InputType, OutputType>::StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)(), void (*_perThreadDataDestructor)(void *))
{
(void) stackSize;
runThreadsMutex.Lock();
if (runThreads==true)
{
runThreadsMutex.Unlock();
return false;
}
runThreadsMutex.Unlock();
#ifdef _WIN32
quitAndIncomingDataEvents[0]=CreateEvent(0, true, false, 0);
quitAndIncomingDataEvents[1]=CreateEvent(0, false, false, 0);
#endif
perThreadDataFactory=_perThreadDataFactory;
perThreadDataDestructor=_perThreadDataDestructor;
runThreadsMutex.Lock();
runThreads=true;
runThreadsMutex.Unlock();
numThreadsWorking=0;
unsigned threadId = 0;
(void) threadId;
int i;
for (i=0; i < numThreads; i++)
{
int errorCode = RakNet::RakThread::Create(WorkerThread<InputType, OutputType>, this);
if (errorCode!=0)
{
StopThreads();
return false;
}
}
// Wait for number of threads running to increase to numThreads
bool done=false;
while (done==false)
{
RakSleep(50);
numThreadsRunningMutex.Lock();
if (numThreadsRunning==numThreads)
done=true;
numThreadsRunningMutex.Unlock();
}
return true;
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::StopThreads(void)
{
runThreadsMutex.Lock();
if (runThreads==false)
{
runThreadsMutex.Unlock();
return;
}
runThreads=false;
runThreadsMutex.Unlock();
#ifdef _WIN32
// Quit event
SetEvent(quitAndIncomingDataEvents[0]);
#endif
// Wait for number of threads running to decrease to 0
bool done=false;
while (done==false)
{
RakSleep(50);
numThreadsRunningMutex.Lock();
if (numThreadsRunning==0)
done=true;
numThreadsRunningMutex.Unlock();
}
#ifdef _WIN32
CloseHandle(quitAndIncomingDataEvents[0]);
CloseHandle(quitAndIncomingDataEvents[1]);
#endif
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData)
{
inputQueueMutex.Lock();
inputQueue.Push(inputData);
inputFunctionQueue.Push(workerThreadCallback);
inputQueueMutex.Unlock();
#ifdef _WIN32
// Input data event
SetEvent(quitAndIncomingDataEvents[1]);
#endif
}
template <class InputType, class OutputType>
bool ThreadPool<InputType, OutputType>::HasOutputFast(void)
{
return outputQueue.IsEmpty()==false;
}
template <class InputType, class OutputType>
bool ThreadPool<InputType, OutputType>::HasOutput(void)
{
bool res;
outputQueueMutex.Lock();
res=outputQueue.IsEmpty()==false;
outputQueueMutex.Unlock();
return res;
}
template <class InputType, class OutputType>
bool ThreadPool<InputType, OutputType>::HasInputFast(void)
{
return inputQueue.IsEmpty()==false;
}
template <class InputType, class OutputType>
bool ThreadPool<InputType, OutputType>::HasInput(void)
{
bool res;
inputQueueMutex.Lock();
res=inputQueue.IsEmpty()==false;
inputQueueMutex.Unlock();
return res;
}
template <class InputType, class OutputType>
OutputType ThreadPool<InputType, OutputType>::GetOutput(void)
{
// Real output check
OutputType output;
outputQueueMutex.Lock();
output=outputQueue.Pop();
outputQueueMutex.Unlock();
return output;
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::Clear(void)
{
runThreadsMutex.Lock();
if (runThreads)
{
runThreadsMutex.Unlock();
inputQueueMutex.Lock();
inputFunctionQueue.Clear();
inputQueue.Clear();
inputQueueMutex.Unlock();
outputQueueMutex.Lock();
outputQueue.Clear();
outputQueueMutex.Unlock();
}
else
{
inputFunctionQueue.Clear();
inputQueue.Clear();
outputQueue.Clear();
}
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::LockInput(void)
{
inputQueueMutex.Lock();
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::UnlockInput(void)
{
inputQueueMutex.Unlock();
}
template <class InputType, class OutputType>
unsigned ThreadPool<InputType, OutputType>::InputSize(void)
{
return inputQueue.Size();
}
template <class InputType, class OutputType>
InputType ThreadPool<InputType, OutputType>::GetInputAtIndex(unsigned index)
{
return inputQueue[index];
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::RemoveInputAtIndex(unsigned index)
{
inputQueue.RemoveAtIndex(index);
inputFunctionQueue.RemoveAtIndex(index);
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::LockOutput(void)
{
outputQueueMutex.Lock();
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::UnlockOutput(void)
{
outputQueueMutex.Unlock();
}
template <class InputType, class OutputType>
unsigned ThreadPool<InputType, OutputType>::OutputSize(void)
{
return outputQueue.Size();
}
template <class InputType, class OutputType>
OutputType ThreadPool<InputType, OutputType>::GetOutputAtIndex(unsigned index)
{
return outputQueue[index];
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::RemoveOutputAtIndex(unsigned index)
{
outputQueue.RemoveAtIndex(index);
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::ClearInput(void)
{
inputQueue.Clear();
inputFunctionQueue.Clear();
}
template <class InputType, class OutputType>
void ThreadPool<InputType, OutputType>::ClearOutput(void)
{
outputQueue.Clear();
}
template <class InputType, class OutputType>
bool ThreadPool<InputType, OutputType>::IsWorking(void)
{
bool isWorking;
// workingThreadCountMutex.Lock();
// isWorking=numThreadsWorking!=0;
// workingThreadCountMutex.Unlock();
// if (isWorking)
// return true;
// Bug fix: Originally the order of these two was reversed.
// It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks
// here and sees there is no data. So it thinks the thread is not working when it was.
if (HasOutputFast() && HasOutput())
return true;
if (HasInputFast() && HasInput())
return true;
// Need to check is working again, in case the thread was between the first and second checks
workingThreadCountMutex.Lock();
isWorking=numThreadsWorking!=0;
workingThreadCountMutex.Unlock();
return isWorking;
}
template <class InputType, class OutputType>
int ThreadPool<InputType, OutputType>::NumThreadsWorking(void)
{
return numThreadsWorking;
}
template <class InputType, class OutputType>
bool ThreadPool<InputType, OutputType>::WasStopped(void)
{
bool b;
runThreadsMutex.Lock();
b = runThreads;
runThreadsMutex.Unlock();
return b;
}
#ifdef _MSC_VER
#pragma warning( pop )
#endif
#endif