phoenix-firestorm/indra/llcommon/workqueue.cpp

351 lines
8.6 KiB
C++

/**
* @file workqueue.cpp
* @author Nat Goodspeed
* @date 2021-10-06
* @brief Implementation for WorkQueue.
*
* $LicenseInfo:firstyear=2021&license=viewerlgpl$
* Copyright (c) 2021, Linden Research, Inc.
* $/LicenseInfo$
*/
// Precompiled header
#include "linden_common.h"
// associated header
#include "workqueue.h"
// STL headers
// std headers
// external library headers
// other Linden headers
#include "llapp.h"
#include "llcoros.h"
#include LLCOROS_MUTEX_HEADER
#include "llerror.h"
#include "llevents.h"
#include "llexception.h"
#include "stringize.h"
using Mutex = LLCoros::Mutex;
using Lock = LLCoros::LockType;
/*****************************************************************************
* WorkQueueBase
*****************************************************************************/
LL::WorkQueueBase::WorkQueueBase(const std::string& name, bool auto_shutdown)
: super(makeName(name))
{
if (auto_shutdown)
{
// Register for "LLApp" events so we can implicitly close() on viewer shutdown
std::string listener_name = "WorkQueue:" + getKey();
LLEventPumps::instance().obtain("LLApp").listen(
listener_name,
[this](const LLSD& stat)
{
std::string status(stat["status"]);
if (status != "running")
{
// Viewer is shutting down, close this queue
LL_DEBUGS("WorkQueue") << getKey() << " closing on app shutdown" << LL_ENDL;
close();
}
return false;
});
// Store the listener name so we can unregister in the destructor
mListenerName = listener_name;
}
}
LL::WorkQueueBase::~WorkQueueBase()
{
if (!mListenerName.empty() && !LLEventPumps::wasDeleted())
{
LLEventPumps::instance().obtain("LLApp").stopListening(mListenerName);
}
}
void LL::WorkQueueBase::runUntilClose()
{
try
{
for (;;)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
callWork(pop_());
}
}
catch (const Closed&)
{
}
}
bool LL::WorkQueueBase::runPending()
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
for (Work work; tryPop_(work); )
{
callWork(work);
}
return ! done();
}
bool LL::WorkQueueBase::runOne()
{
Work work;
if (tryPop_(work))
{
callWork(work);
}
return ! done();
}
bool LL::WorkQueueBase::runUntil(const TimePoint& until)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
// Should we subtract some slop to allow for typical Work execution time?
// How much slop?
// runUntil() is simply a time-bounded runPending().
for (Work work; TimePoint::clock::now() < until && tryPop_(work); )
{
callWork(work);
}
return ! done();
}
std::string LL::WorkQueueBase::makeName(const std::string& name)
{
if (! name.empty())
return name;
static U32 discriminator = 0;
static Mutex mutex;
U32 num;
{
// Protect discriminator from concurrent access by different threads.
// It can't be thread_local, else two racing threads will come up with
// the same name.
Lock lk(mutex);
num = discriminator++;
}
return STRINGIZE("WorkQueue" << num);
}
namespace
{
#if LL_WINDOWS
static const U32 STATUS_MSC_EXCEPTION = 0xE06D7363; // compiler specific
U32 exception_filter(U32 code, struct _EXCEPTION_POINTERS* exception_infop)
{
if (LLApp::instance()->reportCrashToBugsplat((void*)exception_infop))
{
// Handled
return EXCEPTION_CONTINUE_SEARCH;
}
else if (code == STATUS_MSC_EXCEPTION)
{
// C++ exception, go on
return EXCEPTION_CONTINUE_SEARCH;
}
else
{
// handle it, convert to std::exception
return EXCEPTION_EXECUTE_HANDLER;
}
return EXCEPTION_CONTINUE_SEARCH;
}
void cpphandle(const LL::WorkQueueBase::Work& work)
{
// SE and C++ can not coexists, thus two handlers
try
{
work();
}
catch (const LLContinueError&)
{
// Any uncaught exception derived from LLContinueError will be caught
// here and logged. This coroutine will terminate but the rest of the
// viewer will carry on.
LOG_UNHANDLED_EXCEPTION(STRINGIZE("LLContinue in work queue"));
}
}
void sehandle(const LL::WorkQueueBase::Work& work)
{
__try
{
// handle stop and continue exceptions first
cpphandle(work);
}
__except (exception_filter(GetExceptionCode(), GetExceptionInformation()))
{
// convert to C++ styled exception
char integer_string[512];
sprintf(integer_string, "SEH, code: %lu\n", GetExceptionCode());
throw std::exception(integer_string);
}
}
#endif // LL_WINDOWS
} // anonymous namespace
void LL::WorkQueueBase::callWork(const Work& work)
{
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
#ifdef LL_WINDOWS
// can not use __try directly, toplevel requires unwinding, thus use of a wrapper
sehandle(work);
#else // LL_WINDOWS
try
{
work();
}
catch (LLContinueError&)
{
LOG_UNHANDLED_EXCEPTION(getKey());
}
catch (...)
{
// Stash any other kind of uncaught exception to be rethrown by main thread.
LL_WARNS("LLCoros") << "Capturing and rethrowing uncaught exception in WorkQueueBase "
<< getKey() << LL_ENDL;
LL::WorkQueue::ptr_t main_queue = LL::WorkQueue::getInstance("mainloop");
main_queue->post(
// Bind the current exception, rethrow it in main loop.
[exc = std::current_exception()]() { std::rethrow_exception(exc); });
}
#endif // else LL_WINDOWS
}
void LL::WorkQueueBase::error(const std::string& msg)
{
LL_ERRS("WorkQueue") << msg << LL_ENDL;
}
void LL::WorkQueueBase::checkCoroutine(const std::string& method)
{
// By convention, the default coroutine on each thread has an empty name
// string. See also LLCoros::logname().
if (LLCoros::getName().empty())
{
LLTHROW(Error("Do not call " + method + " from a thread's default coroutine"));
}
}
/*****************************************************************************
* WorkQueue
*****************************************************************************/
LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity, bool auto_shutdown):
super(name, auto_shutdown),
mQueue(capacity)
{
}
void LL::WorkQueue::close()
{
mQueue.close();
}
size_t LL::WorkQueue::size()
{
return mQueue.size();
}
bool LL::WorkQueue::isClosed()
{
return mQueue.isClosed();
}
bool LL::WorkQueue::done()
{
return mQueue.done();
}
bool LL::WorkQueue::post(const Work& callable)
{
return mQueue.pushIfOpen(callable);
}
bool LL::WorkQueue::tryPost(const Work& callable)
{
return mQueue.tryPush(callable);
}
LL::WorkQueue::Work LL::WorkQueue::pop_()
{
return mQueue.pop();
}
bool LL::WorkQueue::tryPop_(Work& work)
{
return mQueue.tryPop(work);
}
/*****************************************************************************
* WorkSchedule
*****************************************************************************/
LL::WorkSchedule::WorkSchedule(const std::string& name, size_t capacity, bool auto_shutdown):
super(name, auto_shutdown),
mQueue(capacity)
{
}
void LL::WorkSchedule::close()
{
mQueue.close();
}
size_t LL::WorkSchedule::size()
{
return mQueue.size();
}
bool LL::WorkSchedule::isClosed()
{
return mQueue.isClosed();
}
bool LL::WorkSchedule::done()
{
return mQueue.done();
}
bool LL::WorkSchedule::post(const Work& callable)
{
// Use TimePoint::clock::now() instead of TimePoint's representation of
// the epoch because this WorkSchedule may contain a mix of past-due
// TimedWork items and TimedWork items scheduled for the future. Sift this
// new item into the correct place.
return post(callable, TimePoint::clock::now());
}
bool LL::WorkSchedule::post(const Work& callable, const TimePoint& time)
{
return mQueue.pushIfOpen(TimedWork(time, callable));
}
bool LL::WorkSchedule::tryPost(const Work& callable)
{
return tryPost(callable, TimePoint::clock::now());
}
bool LL::WorkSchedule::tryPost(const Work& callable, const TimePoint& time)
{
return mQueue.tryPush(TimedWork(time, callable));
}
LL::WorkSchedule::Work LL::WorkSchedule::pop_()
{
return std::get<0>(mQueue.pop());
}
bool LL::WorkSchedule::tryPop_(Work& work)
{
return mQueue.tryPop(work);
}