SL-10291 Replace apr thread with standard C++11 functionality

master
andreykproductengine 2019-01-15 22:27:28 +02:00
parent af0e498293
commit 8a92a771ba
5 changed files with 104 additions and 203 deletions

View File

@ -116,29 +116,27 @@ void LLThread::registerThreadID()
//
// Handed to the APR thread creation function
//
void *APR_THREAD_FUNC LLThread::staticRun(apr_thread_t *apr_threadp, void *datap)
void LLThread::threadRun()
{
LLThread *threadp = (LLThread *)datap;
#ifdef LL_WINDOWS
set_thread_name(-1, threadp->mName.c_str());
set_thread_name(-1, mName.c_str());
#endif
// for now, hard code all LLThreads to report to single master thread recorder, which is known to be running on main thread
threadp->mRecorder = new LLTrace::ThreadRecorder(*LLTrace::get_master_thread_recorder());
mRecorder = new LLTrace::ThreadRecorder(*LLTrace::get_master_thread_recorder());
sThreadID = threadp->mID;
sThreadID = mID;
// Run the user supplied function
do
{
try
{
threadp->run();
run();
}
catch (const LLContinueError &e)
{
LL_WARNS("THREAD") << "ContinueException on thread '" << threadp->mName <<
LL_WARNS("THREAD") << "ContinueException on thread '" << mName <<
"' reentering run(). Error what is: '" << e.what() << "'" << LL_ENDL;
//output possible call stacks to log file.
LLError::LLCallStacks::print();
@ -153,39 +151,25 @@ void *APR_THREAD_FUNC LLThread::staticRun(apr_thread_t *apr_threadp, void *datap
//LL_INFOS() << "LLThread::staticRun() Exiting: " << threadp->mName << LL_ENDL;
delete threadp->mRecorder;
threadp->mRecorder = NULL;
delete mRecorder;
mRecorder = NULL;
// We're done with the run function, this thread is done executing now.
//NB: we are using this flag to sync across threads...we really need memory barriers here
// Todo: add LLMutex per thread instead of flag?
// We are using "while (mStatus != STOPPED) {ms_sleep();}" everywhere.
threadp->mStatus = STOPPED;
return NULL;
mStatus = STOPPED;
}
LLThread::LLThread(const std::string& name, apr_pool_t *poolp) :
mPaused(FALSE),
mName(name),
mAPRThreadp(NULL),
mThreadp(NULL),
mStatus(STOPPED),
mRecorder(NULL)
{
mID = ++sIDIter;
// Thread creation probably CAN be paranoid about APR being initialized, if necessary
if (poolp)
{
mIsLocalPool = FALSE;
mAPRPoolp = poolp;
}
else
{
mIsLocalPool = TRUE;
apr_pool_create(&mAPRPoolp, NULL); // Create a subpool for this thread
}
mRunCondition = new LLCondition();
mDataLock = new LLMutex();
mLocalAPRFilePoolp = NULL ;
@ -217,7 +201,7 @@ void LLThread::shutdown()
// Warning! If you somehow call the thread destructor from itself,
// the thread will die in an unclean fashion!
if (mAPRThreadp)
if (mThreadp)
{
if (!isStopped())
{
@ -248,14 +232,19 @@ void LLThread::shutdown()
{
// This thread just wouldn't stop, even though we gave it time
//LL_WARNS() << "LLThread::~LLThread() exiting thread before clean exit!" << LL_ENDL;
// Put a stake in its heart.
apr_thread_exit(mAPRThreadp, -1);
// Put a stake in its heart. (A very hostile method to force a thread to quit)
#if LL_WINDOWS
TerminateThread(mNativeHandle, 0);
#else
pthread_cancel(mNativeHandle);
#endif
delete mRecorder;
mRecorder = NULL;
mStatus = STOPPED;
return;
}
mAPRThreadp = NULL;
mThreadp = NULL;
}
delete mRunCondition;
@ -263,12 +252,6 @@ void LLThread::shutdown()
delete mDataLock;
mDataLock = NULL;
if (mIsLocalPool && mAPRPoolp)
{
apr_pool_destroy(mAPRPoolp);
mAPRPoolp = 0;
}
if (mRecorder)
{
@ -287,19 +270,15 @@ void LLThread::start()
// Set thread state to running
mStatus = RUNNING;
apr_status_t status =
apr_thread_create(&mAPRThreadp, NULL, staticRun, (void *)this, mAPRPoolp);
if(status == APR_SUCCESS)
{
// We won't bother joining
apr_thread_detach(mAPRThreadp);
try
{
mThreadp = new std::thread(std::bind(&LLThread::threadRun, this));
mNativeHandle = mThreadp->native_handle();
}
else
catch (std::system_error& ex)
{
mStatus = STOPPED;
LL_WARNS() << "failed to start thread " << mName << LL_ENDL;
ll_apr_warn_status(status);
LL_WARNS() << "failed to start thread " << mName << " " << ex.what() << LL_ENDL;
}
}
@ -376,11 +355,7 @@ U32 LLThread::currentID()
// static
void LLThread::yield()
{
#if LL_LINUX || LL_SOLARIS
sched_yield(); // annoyingly, apr_thread_yield is a noop on linux...
#else
apr_thread_yield();
#endif
std::this_thread::yield();
}
void LLThread::wake()

View File

@ -29,10 +29,10 @@
#include "llapp.h"
#include "llapr.h"
#include "apr_thread_cond.h"
#include "boost/intrusive_ptr.hpp"
#include "llmutex.h"
#include "llrefcount.h"
#include <thread>
LL_COMMON_API void assert_main_thread();
@ -86,7 +86,6 @@ public:
// this kicks off the apr thread
void start(void);
apr_pool_t *getAPRPool() { return mAPRPoolp; }
LLVolatileAPRPool* getLocalAPRFilePool() { return mLocalAPRFilePoolp ; }
U32 getID() const { return mID; }
@ -97,19 +96,18 @@ public:
static void registerThreadID();
private:
BOOL mPaused;
bool mPaused;
std::thread::native_handle_type mNativeHandle; // for termination in case of issues
// static function passed to APR thread creation routine
static void *APR_THREAD_FUNC staticRun(struct apr_thread_t *apr_threadp, void *datap);
void threadRun();
protected:
std::string mName;
class LLCondition* mRunCondition;
LLMutex* mDataLock;
apr_thread_t *mAPRThreadp;
apr_pool_t *mAPRPoolp;
BOOL mIsLocalPool;
std::thread *mThreadp;
EThreadStatus mStatus;
U32 mID;
LLTrace::ThreadRecorder* mRecorder;

View File

@ -24,87 +24,6 @@
*/
#include "linden_common.h"
#include <apr_pools.h>
#include <apr_queue.h>
#include "llthreadsafequeue.h"
#include "llexception.h"
// LLThreadSafeQueueImplementation
//-----------------------------------------------------------------------------
LLThreadSafeQueueImplementation::LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity):
mOwnsPool(pool == 0),
mPool(pool),
mQueue(0)
{
if(mOwnsPool) {
apr_status_t status = apr_pool_create(&mPool, 0);
if(status != APR_SUCCESS) LLTHROW(LLThreadSafeQueueError("failed to allocate pool"));
} else {
; // No op.
}
apr_status_t status = apr_queue_create(&mQueue, capacity, mPool);
if(status != APR_SUCCESS) LLTHROW(LLThreadSafeQueueError("failed to allocate queue"));
}
LLThreadSafeQueueImplementation::~LLThreadSafeQueueImplementation()
{
if(mQueue != 0) {
if(apr_queue_size(mQueue) != 0) LL_WARNS() <<
"terminating queue which still contains " << apr_queue_size(mQueue) <<
" elements;" << "memory will be leaked" << LL_ENDL;
apr_queue_term(mQueue);
}
if(mOwnsPool && (mPool != 0)) apr_pool_destroy(mPool);
}
void LLThreadSafeQueueImplementation::pushFront(void * element)
{
apr_status_t status = apr_queue_push(mQueue, element);
if(status == APR_EINTR) {
LLTHROW(LLThreadSafeQueueInterrupt());
} else if(status != APR_SUCCESS) {
LLTHROW(LLThreadSafeQueueError("push failed"));
} else {
; // Success.
}
}
bool LLThreadSafeQueueImplementation::tryPushFront(void * element){
return apr_queue_trypush(mQueue, element) == APR_SUCCESS;
}
void * LLThreadSafeQueueImplementation::popBack(void)
{
void * element;
apr_status_t status = apr_queue_pop(mQueue, &element);
if(status == APR_EINTR) {
LLTHROW(LLThreadSafeQueueInterrupt());
} else if(status != APR_SUCCESS) {
LLTHROW(LLThreadSafeQueueError("pop failed"));
} else {
return element;
}
}
bool LLThreadSafeQueueImplementation::tryPopBack(void *& element)
{
return apr_queue_trypop(mQueue, &element) == APR_SUCCESS;
}
size_t LLThreadSafeQueueImplementation::size()
{
return apr_queue_size(mQueue);
}

View File

@ -28,12 +28,20 @@
#define LL_LLTHREADSAFEQUEUE_H
#include "llexception.h"
#include <deque>
#include <string>
#if LL_WINDOWS
#pragma warning (push)
#pragma warning (disable:4265)
#endif
// 'std::_Pad' : class has virtual functions, but destructor is not virtual
#include <mutex>
#include <condition_variable>
struct apr_pool_t; // From apr_pools.h
class LLThreadSafeQueueImplementation; // See below.
#if LL_WINDOWS
#pragma warning (pop)
#endif
//
// A general queue exception.
@ -64,31 +72,6 @@ public:
}
};
struct apr_queue_t; // From apr_queue.h
//
// Implementation details.
//
class LL_COMMON_API LLThreadSafeQueueImplementation
{
public:
LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity);
~LLThreadSafeQueueImplementation();
void pushFront(void * element);
bool tryPushFront(void * element);
void * popBack(void);
bool tryPopBack(void *& element);
size_t size();
private:
bool mOwnsPool;
apr_pool_t * mPool;
apr_queue_t * mQueue;
};
//
// Implements a thread safe FIFO.
//
@ -100,7 +83,7 @@ public:
// If the pool is set to NULL one will be allocated and managed by this
// queue.
LLThreadSafeQueue(apr_pool_t * pool = 0, unsigned int capacity = 1024);
LLThreadSafeQueue(U32 capacity = 1024);
// Add an element to the front of queue (will block if the queue has
// reached capacity).
@ -128,77 +111,103 @@ public:
size_t size();
private:
LLThreadSafeQueueImplementation mImplementation;
std::deque< ElementT > mStorage;
U32 mCapacity;
std::mutex mLock;
std::condition_variable mCapacityCond;
std::condition_variable mEmptyCond;
};
// LLThreadSafeQueue
//-----------------------------------------------------------------------------
template<typename ElementT>
LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(apr_pool_t * pool, unsigned int capacity):
mImplementation(pool, capacity)
LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(U32 capacity) :
mCapacity(capacity)
{
; // No op.
}
template<typename ElementT>
void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
{
ElementT * elementCopy = new ElementT(element);
try {
mImplementation.pushFront(elementCopy);
} catch (LLThreadSafeQueueInterrupt) {
delete elementCopy;
throw;
}
while (true)
{
std::unique_lock<std::mutex> lock1(mLock);
if (mStorage.size() < mCapacity)
{
mStorage.push_front(element);
mEmptyCond.notify_one();
return;
}
// Storage Full. Wait for signal.
mCapacityCond.wait(lock1);
}
}
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
{
ElementT * elementCopy = new ElementT(element);
bool result = mImplementation.tryPushFront(elementCopy);
if(!result) delete elementCopy;
return result;
std::unique_lock<std::mutex> lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
if (mStorage.size() >= mCapacity)
return false;
mStorage.push_front(element);
mEmptyCond.notify_one();
return true;
}
template<typename ElementT>
ElementT LLThreadSafeQueue<ElementT>::popBack(void)
{
ElementT * element = reinterpret_cast<ElementT *> (mImplementation.popBack());
ElementT result(*element);
delete element;
return result;
while (true)
{
std::unique_lock<std::mutex> lock1(mLock);
if (!mStorage.empty())
{
ElementT value = mStorage.back();
mStorage.pop_back();
mCapacityCond.notify_one();
return value;
}
// Storage empty. Wait for signal.
mEmptyCond.wait(lock1);
}
}
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
{
void * storedElement;
bool result = mImplementation.tryPopBack(storedElement);
if(result) {
ElementT * elementPtr = reinterpret_cast<ElementT *>(storedElement);
element = *elementPtr;
delete elementPtr;
} else {
; // No op.
}
return result;
std::unique_lock<std::mutex> lock1(mLock, std::defer_lock);
if (!lock1.try_lock())
return false;
if (mStorage.empty())
return false;
element = mStorage.back();
mStorage.pop_back();
mCapacityCond.notify_one();
return true;
}
template<typename ElementT>
size_t LLThreadSafeQueue<ElementT>::size(void)
{
return mImplementation.size();
std::lock_guard<std::mutex> lock(mLock);
return mStorage.size();
}
#endif

View File

@ -46,7 +46,7 @@ void LLMainLoopRepeater::start(void)
{
if(mQueue != 0) return;
mQueue = new LLThreadSafeQueue<LLSD>(gAPRPoolp, 1024);
mQueue = new LLThreadSafeQueue<LLSD>(1024);
mMainLoopConnection = LLEventPumps::instance().
obtain("mainloop").listen(LLEventPump::inventName(), boost::bind(&LLMainLoopRepeater::onMainLoop, this, _1));
mRepeaterConnection = LLEventPumps::instance().