DRTVWR-476: Revert "Use LLThreadSafeQueue, not boost::fibers::buffered_channel."
This reverts commit bf8aea5059.
Try boost::fibers::buffered_channel again with Boost 1.72.
master
parent
dbfbe5419c
commit
9d428662f8
|
|
@ -33,7 +33,7 @@
|
|||
|
||||
#include <chrono>
|
||||
|
||||
#include "llthreadsafequeue.h"
|
||||
#include <boost/fiber/buffered_channel.hpp>
|
||||
|
||||
#include "llexception.h"
|
||||
#include "stringize.h"
|
||||
|
|
@ -105,7 +105,10 @@ private:
|
|||
CoProcedure_t mProc;
|
||||
};
|
||||
|
||||
typedef LLThreadSafeQueue<QueuedCoproc::ptr_t> CoprocQueue_t;
|
||||
// we use a buffered_channel here rather than unbuffered_channel since we want to be able to
|
||||
// push values without blocking,even if there's currently no one calling a pop operation (due to
|
||||
// fiber running right now)
|
||||
typedef boost::fibers::buffered_channel<QueuedCoproc::ptr_t> CoprocQueue_t;
|
||||
// Use shared_ptr to control the lifespan of our CoprocQueue_t instance
|
||||
// because the consuming coroutine might outlive this LLCoprocedurePool
|
||||
// instance.
|
||||
|
|
@ -321,13 +324,14 @@ LLUUID LLCoprocedurePool::enqueueCoprocedure(const std::string &name, LLCoproced
|
|||
LLUUID id(LLUUID::generateNewID());
|
||||
|
||||
LL_INFOS("CoProcMgr") << "Coprocedure(" << name << ") enqueuing with id=" << id.asString() << " in pool \"" << mPoolName << "\" at " << mPending << LL_ENDL;
|
||||
auto pushed = mPendingCoprocs->tryPushFront(boost::make_shared<QueuedCoproc>(name, id, proc));
|
||||
// We don't really have a lot of good options if tryPushFront() failed,
|
||||
// perhaps because the consuming coroutines are gummed up or something. This
|
||||
auto pushed = mPendingCoprocs->try_push(boost::make_shared<QueuedCoproc>(name, id, proc));
|
||||
// We don't really have a lot of good options if try_push() failed,
|
||||
// perhaps because the consuming coroutine is gummed up or something. This
|
||||
// method is probably called from code called by mainloop. If we toss an
|
||||
// llcoro::suspend() call here, we'll circle back for another mainloop
|
||||
// iteration, possibly resulting in being re-entered here. Let's avoid that.
|
||||
LL_ERRS_IF(! pushed, "CoProcMgr") << "Enqueue failed" << LL_ENDL;
|
||||
LL_ERRS_IF(pushed != boost::fibers::channel_op_status::success, "CoProcMgr")
|
||||
<< "Enqueue failed because queue is " << int(pushed) << LL_ENDL;
|
||||
++mPending;
|
||||
|
||||
return id;
|
||||
|
|
@ -339,19 +343,24 @@ void LLCoprocedurePool::coprocedureInvokerCoro(
|
|||
LLCoreHttpUtil::HttpCoroutineAdapter::ptr_t httpAdapter)
|
||||
{
|
||||
QueuedCoproc::ptr_t coproc;
|
||||
boost::fibers::channel_op_status status;
|
||||
for (;;)
|
||||
{
|
||||
try
|
||||
{
|
||||
LLCoros::TempStatus st("waiting for work");
|
||||
coproc = pendingCoprocs->popBack();
|
||||
LLCoros::TempStatus st("waiting for work for 10s");
|
||||
status = pendingCoprocs->pop_wait_for(coproc, std::chrono::seconds(10));
|
||||
}
|
||||
catch (const LLThreadSafeQueueError&)
|
||||
if (status == boost::fibers::channel_op_status::closed)
|
||||
{
|
||||
// queue is closed
|
||||
break;
|
||||
}
|
||||
|
||||
if(status == boost::fibers::channel_op_status::timeout)
|
||||
{
|
||||
LL_INFOS_ONCE() << "pool '" << mPoolName << "' stalled." << LL_ENDL;
|
||||
continue;
|
||||
}
|
||||
// we actually popped an item
|
||||
--mPending;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue