159 lines
3.4 KiB
C++
159 lines
3.4 KiB
C++
/**
|
|
* @file workqueue.cpp
|
|
* @author Nat Goodspeed
|
|
* @date 2021-10-06
|
|
* @brief Implementation for WorkQueue.
|
|
*
|
|
* $LicenseInfo:firstyear=2021&license=viewerlgpl$
|
|
* Copyright (c) 2021, Linden Research, Inc.
|
|
* $/LicenseInfo$
|
|
*/
|
|
|
|
// Precompiled header
|
|
#include "linden_common.h"
|
|
// associated header
|
|
#include "workqueue.h"
|
|
// STL headers
|
|
// std headers
|
|
// external library headers
|
|
// other Linden headers
|
|
#include "llcoros.h"
|
|
#include LLCOROS_MUTEX_HEADER
|
|
#include "llerror.h"
|
|
#include "llexception.h"
|
|
#include "stringize.h"
|
|
|
|
using Mutex = LLCoros::Mutex;
|
|
using Lock = LLCoros::LockType;
|
|
|
|
LL::WorkQueue::WorkQueue(const std::string& name, size_t capacity):
|
|
super(makeName(name)),
|
|
mQueue(capacity)
|
|
{
|
|
// TODO: register for "LLApp" events so we can implicitly close() on
|
|
// viewer shutdown.
|
|
}
|
|
|
|
void LL::WorkQueue::close()
|
|
{
|
|
mQueue.close();
|
|
}
|
|
|
|
size_t LL::WorkQueue::size()
|
|
{
|
|
return mQueue.size();
|
|
}
|
|
|
|
bool LL::WorkQueue::isClosed()
|
|
{
|
|
return mQueue.isClosed();
|
|
}
|
|
|
|
bool LL::WorkQueue::done()
|
|
{
|
|
return mQueue.done();
|
|
}
|
|
|
|
void LL::WorkQueue::runUntilClose()
|
|
{
|
|
try
|
|
{
|
|
for (;;)
|
|
{
|
|
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
|
|
callWork(mQueue.pop());
|
|
}
|
|
}
|
|
catch (const Queue::Closed&)
|
|
{
|
|
}
|
|
}
|
|
|
|
bool LL::WorkQueue::runPending()
|
|
{
|
|
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
|
|
for (Work work; mQueue.tryPop(work); )
|
|
{
|
|
callWork(work);
|
|
}
|
|
return ! mQueue.done();
|
|
}
|
|
|
|
bool LL::WorkQueue::runOne()
|
|
{
|
|
Work work;
|
|
if (mQueue.tryPop(work))
|
|
{
|
|
callWork(work);
|
|
}
|
|
return ! mQueue.done();
|
|
}
|
|
|
|
bool LL::WorkQueue::runUntil(const TimePoint& until)
|
|
{
|
|
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
|
|
// Should we subtract some slop to allow for typical Work execution time?
|
|
// How much slop?
|
|
// runUntil() is simply a time-bounded runPending().
|
|
for (Work work; TimePoint::clock::now() < until && mQueue.tryPop(work); )
|
|
{
|
|
callWork(work);
|
|
}
|
|
return ! mQueue.done();
|
|
}
|
|
|
|
std::string LL::WorkQueue::makeName(const std::string& name)
|
|
{
|
|
if (! name.empty())
|
|
return name;
|
|
|
|
static U32 discriminator = 0;
|
|
static Mutex mutex;
|
|
U32 num;
|
|
{
|
|
// Protect discriminator from concurrent access by different threads.
|
|
// It can't be thread_local, else two racing threads will come up with
|
|
// the same name.
|
|
Lock lk(mutex);
|
|
num = discriminator++;
|
|
}
|
|
return STRINGIZE("WorkQueue" << num);
|
|
}
|
|
|
|
void LL::WorkQueue::callWork(const Queue::DataTuple& work)
|
|
{
|
|
// ThreadSafeSchedule::pop() always delivers a tuple, even when
|
|
// there's only one data field per item, as for us.
|
|
callWork(std::get<0>(work));
|
|
}
|
|
|
|
void LL::WorkQueue::callWork(const Work& work)
|
|
{
|
|
LL_PROFILE_ZONE_SCOPED_CATEGORY_THREAD;
|
|
try
|
|
{
|
|
work();
|
|
}
|
|
catch (...)
|
|
{
|
|
// No matter what goes wrong with any individual work item, the worker
|
|
// thread must go on! Log our own instance name with the exception.
|
|
LOG_UNHANDLED_EXCEPTION(getKey());
|
|
}
|
|
}
|
|
|
|
void LL::WorkQueue::error(const std::string& msg)
|
|
{
|
|
LL_ERRS("WorkQueue") << msg << LL_ENDL;
|
|
}
|
|
|
|
void LL::WorkQueue::checkCoroutine(const std::string& method)
|
|
{
|
|
// By convention, the default coroutine on each thread has an empty name
|
|
// string. See also LLCoros::logname().
|
|
if (LLCoros::getName().empty())
|
|
{
|
|
LLTHROW(Error("Do not call " + method + " from a thread's default coroutine"));
|
|
}
|
|
}
|