start of a thread safe queue

master
Andrew A. de Laix 2010-11-09 15:04:44 -08:00
parent 590e35a6f5
commit 9ae2891a3a
7 changed files with 470 additions and 0 deletions

View File

@ -92,6 +92,7 @@ set(llcommon_SOURCE_FILES
llstringtable.cpp
llsys.cpp
llthread.cpp
llthreadsafequeue.cpp
lltimer.cpp
lluri.cpp
lluuid.cpp
@ -223,6 +224,7 @@ set(llcommon_HEADER_FILES
llstringtable.h
llsys.h
llthread.h
llthreadsafequeue.h
lltimer.h
lltreeiterators.h
lluri.h

View File

@ -0,0 +1,109 @@
/**
* @file llthread.cpp
*
* $LicenseInfo:firstyear=2004&license=viewerlgpl$
* Second Life Viewer Source Code
* Copyright (C) 2010, Linden Research, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
* $/LicenseInfo$
*/
#include "linden_common.h"
#include <apr_pools.h>
#include <apr_queue.h>
#include "llthreadsafequeue.h"
// LLThreadSafeQueueImplementation
//-----------------------------------------------------------------------------
LLThreadSafeQueueImplementation::LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity):
mOwnsPool(pool == 0),
mPool(pool),
mQueue(0)
{
if(mOwnsPool) {
apr_status_t status = apr_pool_create(&mPool, 0);
if(status != APR_SUCCESS) throw LLThreadSafeQueueError("failed to allocate pool");
} else {
; // No op.
}
apr_status_t status = apr_queue_create(&mQueue, capacity, mPool);
if(status != APR_SUCCESS) throw LLThreadSafeQueueError("failed to allocate queue");
}
LLThreadSafeQueueImplementation::~LLThreadSafeQueueImplementation()
{
if(mOwnsPool && (mPool != 0)) apr_pool_destroy(mPool);
if(mQueue != 0) {
if(apr_queue_size(mQueue) != 0) llwarns <<
"terminating queue which still contains elements;" <<
"memory will be leaked" << LL_ENDL;
apr_queue_term(mQueue);
}
}
void LLThreadSafeQueueImplementation::pushFront(void * element)
{
apr_status_t status = apr_queue_push(mQueue, element);
if(status == APR_EINTR) {
throw LLThreadSafeQueueInterrupt();
} else if(status != APR_SUCCESS) {
throw LLThreadSafeQueueError("push failed");
} else {
; // Success.
}
}
bool LLThreadSafeQueueImplementation::tryPushFront(void * element){
return apr_queue_trypush(mQueue, element) == APR_SUCCESS;
}
void * LLThreadSafeQueueImplementation::popBack(void)
{
void * element;
apr_status_t status = apr_queue_pop(mQueue, &element);
if(status == APR_EINTR) {
throw LLThreadSafeQueueInterrupt();
} else if(status != APR_SUCCESS) {
throw LLThreadSafeQueueError("pop failed");
} else {
return element;
}
}
bool LLThreadSafeQueueImplementation::tryPopBack(void *& element)
{
return apr_queue_trypop(mQueue, &element) == APR_SUCCESS;
}
size_t LLThreadSafeQueueImplementation::size()
{
return apr_queue_size(mQueue);
}

View File

@ -0,0 +1,205 @@
/**
* @file llthreadsafequeue.h
* @brief Base classes for thread, mutex and condition handling.
*
* $LicenseInfo:firstyear=2004&license=viewerlgpl$
* Second Life Viewer Source Code
* Copyright (C) 2010, Linden Research, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
* $/LicenseInfo$
*/
#ifndef LL_LLTHREADSAFEQUEUE_H
#define LL_LLTHREADSAFEQUEUE_H
#include <string>
#include <stdexcept>
struct apr_pool_t; // From apr_pools.h
class LLThreadSafeQueueImplementation; // See below.
//
// A general queue exception.
//
class LLThreadSafeQueueError:
public std::runtime_error
{
public:
LLThreadSafeQueueError(std::string const & message):
std::runtime_error(message)
{
; // No op.
}
};
//
// An exception raised when blocking operations are interrupted.
//
class LLThreadSafeQueueInterrupt:
public LLThreadSafeQueueError
{
public:
LLThreadSafeQueueInterrupt(void):
LLThreadSafeQueueError("queue operation interrupted")
{
; // No op.
}
};
struct apr_queue_t; // From apr_queue.h
//
// Implementation details.
//
class LLThreadSafeQueueImplementation
{
public:
LLThreadSafeQueueImplementation(apr_pool_t * pool, unsigned int capacity);
~LLThreadSafeQueueImplementation();
void pushFront(void * element);
bool tryPushFront(void * element);
void * popBack(void);
bool tryPopBack(void *& element);
size_t size();
private:
bool mOwnsPool;
apr_pool_t * mPool;
apr_queue_t * mQueue;
};
//
// Implements a thread safe FIFO.
//
template<typename ElementT>
class LLThreadSafeQueue
{
public:
typedef ElementT value_type;
// If the pool is set to NULL one will be allocated and managed by this
// queue.
LLThreadSafeQueue(apr_pool_t * pool = 0, unsigned int capacity = 1024);
// Add an element to the front of queue (will block if the queue has
// reached capacity).
//
// This call will raise an interrupt error if the queue is deleted while
// the caller is blocked.
void pushFront(ElementT const & element);
// Try to add an element to the front ofqueue without blocking. Returns
// true only if the element was actually added.
bool tryPushFront(ElementT const & element);
// Pop the element at the end of the queue (will block if the queue is
// empty).
//
// This call will raise an interrupt error if the queue is deleted while
// the caller is blocked.
ElementT popBack(void);
// Pop an element from the end of the queue if there is one available.
// Returns true only if an element was popped.
bool tryPopBack(ElementT & element);
// Returns the size of the queue.
size_t size();
private:
LLThreadSafeQueueImplementation mImplementation;
};
// LLThreadSafeQueue
//-----------------------------------------------------------------------------
template<typename ElementT>
LLThreadSafeQueue<ElementT>::LLThreadSafeQueue(apr_pool_t * pool, unsigned int capacity):
mImplementation(pool, capacity)
{
; // No op.
}
template<typename ElementT>
void LLThreadSafeQueue<ElementT>::pushFront(ElementT const & element)
{
ElementT * elementCopy = new ElementT(element);
try {
mImplementation.pushFront(elementCopy);
} catch (LLThreadSafeQueueInterrupt) {
delete elementCopy;
throw;
}
}
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPushFront(ElementT const & element)
{
ElementT * elementCopy = new ElementT(element);
bool result = mImplementation.tryPushFront(elementCopy);
if(!result) delete elementCopy;
return result;
}
template<typename ElementT>
ElementT LLThreadSafeQueue<ElementT>::popBack(void)
{
ElementT * element = reinterpret_cast<ElementT *> (mImplementation.popBack());
ElementT result(*element);
delete element;
return result;
}
template<typename ElementT>
bool LLThreadSafeQueue<ElementT>::tryPopBack(ElementT & element)
{
void * storedElement;
bool result = mImplementation.tryPopBack(storedElement);
if(result) {
ElementT * elementPtr = reinterpret_cast<ElementT *>(storedElement);
element = *elementPtr;
delete elementPtr;
} else {
; // No op.
}
return result;
}
template<typename ElementT>
size_t LLThreadSafeQueue<ElementT>::size(void)
{
return mImplementation.size();
}
#endif

View File

@ -283,6 +283,7 @@ set(viewer_SOURCE_FILES
llloginhandler.cpp
lllogininstance.cpp
llmachineid.cpp
llmainlooprepeater.cpp
llmanip.cpp
llmaniprotate.cpp
llmanipscale.cpp
@ -815,6 +816,7 @@ set(viewer_HEADER_FILES
llloginhandler.h
lllogininstance.h
llmachineid.h
llmainlooprepeater.h
llmanip.h
llmaniprotate.h
llmanipscale.h

View File

@ -197,6 +197,8 @@
#include "llsecapi.h"
#include "llmachineid.h"
#include "llmainlooprepeater.h"
// *FIX: These extern globals should be cleaned up.
// The globals either represent state/config/resource-storage of either
// this app, or another 'component' of the viewer. App globals should be
@ -801,6 +803,9 @@ bool LLAppViewer::init()
return 1;
}
// Initialize the repeater service.
LLMainLoopRepeater::getInstance()->start();
//
// Initialize the window
//

View File

@ -0,0 +1,82 @@
/**
* @file llmachineid.cpp
* @brief retrieves unique machine ids
*
* $LicenseInfo:firstyear=2009&license=viewerlgpl$
* Second Life Viewer Source Code
* Copyright (C) 2010, Linden Research, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
* $/LicenseInfo$
*/
#include "llviewerprecompiledheaders.h"
#include "llapr.h"
#include "llevents.h"
#include "llmainlooprepeater.h"
// LLMainLoopRepeater
//-----------------------------------------------------------------------------
LLMainLoopRepeater::LLMainLoopRepeater(void):
mQueue(gAPRPoolp, 1024)
{
; // No op.
}
void LLMainLoopRepeater::start(void)
{
mMainLoopConnection = LLEventPumps::instance().
obtain("mainloop").listen("stupid name here", boost::bind(&LLMainLoopRepeater::onMainLoop, this, _1));
mRepeaterConnection = LLEventPumps::instance().
obtain("mainlooprepeater").listen("other stupid name here", boost::bind(&LLMainLoopRepeater::onMessage, this, _1));
}
void LLMainLoopRepeater::stop(void)
{
mMainLoopConnection.release();
mRepeaterConnection.release();
}
bool LLMainLoopRepeater::onMainLoop(LLSD const &)
{
LLSD message;
while(mQueue.tryPopBack(message)) {
std::string pump = message["pump"].asString();
if(pump.length() == 0 ) continue; // No pump.
LLEventPumps::instance().obtain(pump).post(message["payload"]);
}
return false;
}
bool LLMainLoopRepeater::onMessage(LLSD const & event)
{
try {
mQueue.pushFront(event);
} catch(LLThreadSafeQueueError & e) {
llwarns << "could not repeat message (" << e.what() << ")" <<
event.asString() << LL_ENDL;
}
return false;
}

View File

@ -0,0 +1,65 @@
/**
* @file llmainlooprepeater.h
* @brief a service for repeating messages on the main loop.
*
* $LicenseInfo:firstyear=2010&license=viewerlgpl$
* Second Life Viewer Source Code
* Copyright (C) 2010, Linden Research, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation;
* version 2.1 of the License only.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*
* Linden Research, Inc., 945 Battery Street, San Francisco, CA 94111 USA
* $/LicenseInfo$
*/
#ifndef LL_LLMAINLOOPREPEATER_H
#define LL_LLMAINLOOPREPEATER_H
#include "llsd.h"
#include "llthreadsafequeue.h"
//
// A service which creates the pump 'mainlooprepeater' to which any thread can
// post a message that will be re-posted on the main loop.
//
// The posted message should contain two map elements: pump and payload. The
// pump value is a string naming the pump to which the message should be
// re-posted. The payload value is what will be posted to the designated pump.
//
class LLMainLoopRepeater:
public LLSingleton<LLMainLoopRepeater>
{
public:
LLMainLoopRepeater(void);
// Start the repeater service.
void start(void);
// Stop the repeater service.
void stop(void);
private:
LLTempBoundListener mMainLoopConnection;
LLTempBoundListener mRepeaterConnection;
LLThreadSafeQueue<LLSD> mQueue;
bool onMainLoop(LLSD const &);
bool onMessage(LLSD const & event);
};
#endif