SH-3184/SH-3221 Improve cleanup, destructor, thread termination, etc. logic in library.

With this commit, the cleanup paths should be production quality.  Unit tests have been
expanded to include cases requiring thread termination and cleanup by the worker thread.
Special operation/request added to support the unit tests.  Thread interface expanded
to include a very aggressive cancel() method that does not do cleanup but prevents the
thread from accessing objects that will be destroyed.
master
Monty Brandenberg 2012-06-23 23:33:50 -04:00
parent bc7d5b24d1
commit e172ec84fa
13 changed files with 447 additions and 75 deletions

View File

@ -47,12 +47,19 @@ HttpLibcurl::HttpLibcurl(HttpService * service)
HttpLibcurl::~HttpLibcurl()
{
// *FIXME: need to cancel requests in this class, not in op class.
shutdown();
mService = NULL;
}
void HttpLibcurl::shutdown()
{
while (! mActiveOps.empty())
{
active_set_t::iterator item(mActiveOps.begin());
(*item)->cancel();
cancelRequest(*item);
(*item)->release();
mActiveOps.erase(item);
}
@ -63,8 +70,6 @@ HttpLibcurl::~HttpLibcurl()
{
if (mMultiHandles[policy_class])
{
// *FIXME: Do some multi cleanup here first
curl_multi_cleanup(mMultiHandles[policy_class]);
mMultiHandles[policy_class] = 0;
}
@ -73,20 +78,12 @@ HttpLibcurl::~HttpLibcurl()
delete [] mMultiHandles;
mMultiHandles = NULL;
}
mService = NULL;
mPolicyCount = 0;
}
void HttpLibcurl::init()
{}
void HttpLibcurl::term()
{}
void HttpLibcurl::setPolicyCount(int policy_count)
void HttpLibcurl::start(int policy_count)
{
llassert_always(policy_count <= POLICY_CLASS_LIMIT);
llassert_always(! mMultiHandles); // One-time call only
@ -143,8 +140,9 @@ HttpService::ELoopSpeed HttpLibcurl::processTransport()
}
else
{
// *FIXME: Issue a logging event for this.
;
LL_WARNS_ONCE("CoreHttp") << "Unexpected message from libcurl. Msg code: "
<< msg->msg
<< LL_ENDL;
}
msgs_in_queue = 0;
}
@ -191,30 +189,61 @@ void HttpLibcurl::addOp(HttpOpRequest * op)
}
// *NOTE: cancelRequest logic parallels completeRequest logic.
// Keep them synchronized as necessary. Caller is expected to
// remove to op from the active list and release the op *after*
// calling this method. It must be called first to deliver the
// op to the reply queue with refcount intact.
void HttpLibcurl::cancelRequest(HttpOpRequest * op)
{
// Deactivate request
op->mCurlActive = false;
// Detach from multi and recycle handle
curl_multi_remove_handle(mMultiHandles[op->mReqPolicy], op->mCurlHandle);
curl_easy_cleanup(op->mCurlHandle);
op->mCurlHandle = NULL;
// Tracing
if (op->mTracing > TRACE_OFF)
{
LL_INFOS("CoreHttp") << "TRACE, RequestCanceled, Handle: "
<< static_cast<HttpHandle>(op)
<< ", Status: " << op->mStatus.toHex()
<< LL_ENDL;
}
// Cancel op and deliver for notification
op->cancel();
}
// *NOTE: cancelRequest logic parallels completeRequest logic.
// Keep them synchronized as necessary.
bool HttpLibcurl::completeRequest(CURLM * multi_handle, CURL * handle, CURLcode status)
{
HttpOpRequest * op(NULL);
curl_easy_getinfo(handle, CURLINFO_PRIVATE, &op);
// *FIXME: check the pointer
if (handle != op->mCurlHandle || ! op->mCurlActive)
{
// *FIXME: This is a sanity check that needs validation/termination.
;
LL_WARNS("CoreHttp") << "libcurl handle and HttpOpRequest handle in disagreement or inactive request."
<< " Handle: " << static_cast<HttpHandle>(handle)
<< LL_ENDL;
return false;
}
active_set_t::iterator it(mActiveOps.find(op));
if (mActiveOps.end() == it)
{
// *FIXME: Fatal condition. This must be here.
;
}
else
{
mActiveOps.erase(it);
LL_WARNS("CoreHttp") << "libcurl completion for request not on active list. Continuing."
<< " Handle: " << static_cast<HttpHandle>(handle)
<< LL_ENDL;
return false;
}
// Deactivate request
mActiveOps.erase(it);
op->mCurlActive = false;
// Set final status of request if it hasn't failed by other mechanisms yet
@ -258,9 +287,21 @@ int HttpLibcurl::getActiveCount() const
}
int HttpLibcurl::getActiveCountInClass(int /* policy_class */) const
int HttpLibcurl::getActiveCountInClass(int policy_class) const
{
return getActiveCount();
int count(0);
for (active_set_t::const_iterator iter(mActiveOps.begin());
mActiveOps.end() != iter;
++iter)
{
if ((*iter)->mReqPolicy == policy_class)
{
++count;
}
}
return count;
}

View File

@ -65,9 +65,6 @@ private:
void operator=(const HttpLibcurl &); // Not defined
public:
static void init();
static void term();
/// Give cycles to libcurl to run active requests. Completed
/// operations (successful or failed) will be retried or handed
/// over to the reply queue as final responses.
@ -79,16 +76,23 @@ public:
void addOp(HttpOpRequest * op);
/// One-time call to set the number of policy classes to be
/// serviced and to create the resources for each.
void setPolicyCount(int policy_count);
/// serviced and to create the resources for each. Value
/// must agree with HttpPolicy::setPolicies() call.
void start(int policy_count);
void shutdown();
int getActiveCount() const;
int getActiveCountInClass(int policy_class) const;
protected:
/// Invoked when libcurl has indicated a request has been processed
/// to completion and we need to move the request to a new state.
bool completeRequest(CURLM * multi_handle, CURL * handle, CURLcode status);
/// Invoked to cancel an active request, mainly during shutdown
/// and destroy.
void cancelRequest(HttpOpRequest * op);
protected:
typedef std::set<HttpOpRequest *> active_set_t;

View File

@ -208,4 +208,40 @@ void HttpOpNull::stageFromRequest(HttpService * service)
}
// ==================================
// HttpOpSpin
// ==================================
HttpOpSpin::HttpOpSpin(int mode)
: HttpOperation(),
mMode(mode)
{}
HttpOpSpin::~HttpOpSpin()
{}
void HttpOpSpin::stageFromRequest(HttpService * service)
{
if (0 == mMode)
{
// Spin forever
while (true)
{
ms_sleep(100);
}
}
else
{
this->addRef();
if (! HttpRequestQueue::instanceOf()->addOp(this))
{
this->release();
}
}
}
} // end namespace LLCore

View File

@ -170,6 +170,31 @@ public:
}; // end class HttpOpNull
/// HttpOpSpin is a test-only request that puts the worker
/// thread into a cpu spin. Used for unit tests and cleanup
/// evaluation. You do not want to use this.
class HttpOpSpin : public HttpOperation
{
public:
// 0 does a hard spin in the operation
// 1 does a soft spin continuously requeuing itself
HttpOpSpin(int mode);
protected:
virtual ~HttpOpSpin();
private:
HttpOpSpin(const HttpOpSpin &); // Not defined
void operator=(const HttpOpSpin &); // Not defined
public:
virtual void stageFromRequest(HttpService *);
protected:
int mMode;
}; // end class HttpOpSpin
} // end namespace LLCore
#endif // _LLCORE_HTTP_OPERATION_H_

View File

@ -75,6 +75,14 @@ HttpPolicy::HttpPolicy(HttpService * service)
HttpPolicy::~HttpPolicy()
{
shutdown();
mService = NULL;
}
void HttpPolicy::shutdown()
{
for (int policy_class(0); policy_class < mActiveClasses; ++policy_class)
{
@ -100,12 +108,12 @@ HttpPolicy::~HttpPolicy()
}
delete [] mState;
mState = NULL;
mService = NULL;
mActiveClasses = 0;
}
void HttpPolicy::setPolicies(const HttpPolicyGlobal & global,
const std::vector<HttpPolicyClass> & classes)
void HttpPolicy::start(const HttpPolicyGlobal & global,
const std::vector<HttpPolicyClass> & classes)
{
llassert_always(! mState);
@ -244,6 +252,7 @@ bool HttpPolicy::changePriority(HttpHandle handle, HttpRequest::priority_t prior
return false;
}
bool HttpPolicy::stageAfterCompletion(HttpOpRequest * op)
{
static const HttpStatus cant_connect(HttpStatus::EXT_CURL_EASY, CURLE_COULDNT_CONNECT);

View File

@ -60,6 +60,17 @@ private:
void operator=(const HttpPolicy &); // Not defined
public:
/// Cancel all ready and retry requests sending them to
/// their notification queues. Release state resources
/// making further request handling impossible.
void shutdown();
/// Deliver policy definitions and enable handling of
/// requests. One-time call invoked before starting
/// the worker thread.
void start(const HttpPolicyGlobal & global,
const std::vector<HttpPolicyClass> & classes);
/// Give the policy layer some cycles to scan the ready
/// queue promoting higher-priority requests to active
/// as permited.
@ -98,10 +109,6 @@ public:
return mGlobalOptions;
}
void setPolicies(const HttpPolicyGlobal & global,
const std::vector<HttpPolicyClass> & classes);
// Get ready counts for a particular class
int getReadyCount(HttpRequest::policy_t policy_class);

View File

@ -73,11 +73,15 @@ public:
public:
typedef std::vector<HttpOperation *> OpContainer;
/// Insert an object at the back of the reply queue.
/// Insert an object at the back of the request queue.
///
/// Caller must provide one refcount to the queue which takes
/// possession of the count.
///
/// @return Standard status. On failure, caller
/// must dispose of the operation with
/// an explicit release() call.
///
/// Threading: callable by any thread.
HttpStatus addOp(HttpOperation * op);

View File

@ -53,35 +53,43 @@ HttpService::HttpService()
mPolicy(NULL),
mTransport(NULL)
{
// Create the default policy class
HttpPolicyClass pol_class;
pol_class.set(HttpRequest::CP_CONNECTION_LIMIT, DEFAULT_CONNECTIONS);
pol_class.set(HttpRequest::CP_PER_HOST_CONNECTION_LIMIT, DEFAULT_CONNECTIONS);
pol_class.set(HttpRequest::CP_ENABLE_PIPELINING, 0L);
mPolicyClasses.push_back(pol_class);
}
HttpService::~HttpService()
{
mExitRequested = true;
if (RUNNING == sState)
{
// Trying to kill the service object with a running thread
// is a bit tricky.
if (mThread)
{
mThread->cancel();
if (! mThread->timedJoin(2000))
{
// Failed to join, expect problems ahead...
LL_WARNS("CoreHttp") << "Destroying HttpService with running thread. Expect problems."
<< LL_ENDL;
}
}
}
if (mRequestQueue)
{
mRequestQueue->release();
mRequestQueue = NULL;
}
if (mPolicy)
{
// *TODO: need a finalization here
;
}
if (mTransport)
{
// *TODO: need a finalization here
delete mTransport;
mTransport = NULL;
}
delete mTransport;
mTransport = NULL;
delete mPolicy;
mPolicy = NULL;
@ -110,9 +118,22 @@ void HttpService::init(HttpRequestQueue * queue)
void HttpService::term()
{
llassert_always(RUNNING != sState);
if (sInstance)
{
if (RUNNING == sState)
{
// Unclean termination. Thread appears to be running. We'll
// try to give the worker thread a chance to cancel using the
// exit flag...
sInstance->mExitRequested = true;
// And a little sleep
ms_sleep(1000);
// Dtor will make some additional efforts and issue any final
// warnings...
}
delete sInstance;
sInstance = NULL;
}
@ -159,9 +180,9 @@ void HttpService::startThread()
mThread->release();
}
// Push current policy definitions
mPolicy->setPolicies(mPolicyGlobal, mPolicyClasses);
mTransport->setPolicyCount(mPolicyClasses.size());
// Push current policy definitions, enable policy & transport components
mPolicy->start(mPolicyGlobal, mPolicyClasses);
mTransport->start(mPolicyClasses.size());
mThread = new LLCoreInt::HttpThread(boost::bind(&HttpService::threadRun, this, _1));
mThread->addRef(); // Need an explicit reference, implicit one is used internally
@ -174,6 +195,7 @@ void HttpService::stopRequested()
mExitRequested = true;
}
bool HttpService::changePriority(HttpHandle handle, HttpRequest::priority_t priority)
{
bool found(false);
@ -191,9 +213,26 @@ bool HttpService::changePriority(HttpHandle handle, HttpRequest::priority_t prio
void HttpService::shutdown()
{
// Disallow future enqueue of requests
mRequestQueue->stopQueue();
// *FIXME: Run down everything....
// Cancel requests alread on the request queue
HttpRequestQueue::OpContainer ops;
mRequestQueue->fetchAll(false, ops);
while (! ops.empty())
{
HttpOperation * op(ops.front());
ops.erase(ops.begin());
op->cancel();
op->release();
}
// Shutdown transport canceling requests, freeing resources
mTransport->shutdown();
// And now policy
mPolicy->shutdown();
}

View File

@ -134,7 +134,7 @@ public:
/// acquires its weaknesses.
static bool isStopped();
/// Threading: callable by application thread *once*.
/// Threading: callable by consumer thread *once*.
void startThread();
/// Threading: callable by worker thread.
@ -152,23 +152,28 @@ public:
/// Threading: callable by worker thread.
bool changePriority(HttpHandle handle, HttpRequest::priority_t priority);
/// Threading: callable by worker thread.
HttpPolicy & getPolicy()
{
return *mPolicy;
}
/// Threading: callable by worker thread.
HttpLibcurl & getTransport()
{
return *mTransport;
}
/// Threading: callable by consumer thread.
HttpPolicyGlobal & getGlobalOptions()
{
return mPolicyGlobal;
}
/// Threading: callable by consumer thread.
HttpRequest::policy_t createPolicyClass();
/// Threading: callable by consumer thread.
HttpPolicyClass & getClassOptions(HttpRequest::policy_t policy_class)
{
llassert(policy_class >= 0 && policy_class < mPolicyClasses.size());
@ -187,9 +192,9 @@ protected:
static volatile EState sState;
HttpRequestQueue * mRequestQueue;
volatile bool mExitRequested;
// === calling-thread-only data ===
LLCoreInt::HttpThread * mThread;
// === consumer-thread-only data ===
HttpPolicyGlobal mPolicyGlobal;
std::vector<HttpPolicyClass> mPolicyClasses;

View File

@ -27,9 +27,11 @@
#ifndef LLCOREINT_THREAD_H_
#define LLCOREINT_THREAD_H_
#include "linden_common.h"
#include <boost/thread.hpp>
#include <boost/function.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include "_refcounted.h"
@ -91,11 +93,27 @@ public:
mThread->join();
}
inline bool timedJoin(S32 millis)
{
return mThread->timed_join(boost::posix_time::milliseconds(millis));
}
inline bool joinable() const
{
return mThread->joinable();
}
// A very hostile method to force a thread to quit
inline void cancel()
{
boost::thread::native_handle_type thread(mThread->native_handle());
#if LL_WINDOWS
TerminateThread(thread, 0);
#else
pthread_cancel(thread);
#endif
}
private:
boost::function<void(HttpThread *)> mThreadFunc;
boost::thread * mThread;

View File

@ -370,11 +370,13 @@ HttpStatus HttpRequest::createService()
{
HttpStatus status;
llassert_always(! has_inited);
HttpRequestQueue::init();
HttpRequestQueue * rq = HttpRequestQueue::instanceOf();
HttpService::init(rq);
has_inited = true;
if (! has_inited)
{
HttpRequestQueue::init();
HttpRequestQueue * rq = HttpRequestQueue::instanceOf();
HttpService::init(rq);
has_inited = true;
}
return status;
}
@ -384,10 +386,12 @@ HttpStatus HttpRequest::destroyService()
{
HttpStatus status;
llassert_always(has_inited);
HttpService::term();
HttpRequestQueue::term();
has_inited = false;
if (has_inited)
{
HttpService::term();
HttpRequestQueue::term();
has_inited = false;
}
return status;
}
@ -423,6 +427,27 @@ HttpHandle HttpRequest::requestStopThread(HttpHandler * user_handler)
return handle;
}
HttpHandle HttpRequest::requestSpin(int mode)
{
HttpStatus status;
HttpHandle handle(LLCORE_HTTP_HANDLE_INVALID);
HttpOpSpin * op = new HttpOpSpin(mode);
op->setReplyPath(mReplyQueue, NULL);
if (! (status = mRequestQueue->addOp(op))) // transfers refcount
{
op->release();
mLastReqStatus = status;
return handle;
}
mLastReqStatus = status;
handle = static_cast<HttpHandle>(op);
return handle;
}
// ====================================
// Dynamic Policy Methods
// ====================================

View File

@ -409,6 +409,15 @@ public:
///
HttpHandle requestStopThread(HttpHandler * handler);
/// Queue a Spin request.
/// DEBUG/TESTING ONLY. This puts the worker into a CPU spin for
/// test purposes.
///
/// @param mode 0 for hard spin, 1 for soft spin
/// @return Standard handle return cases.
///
HttpHandle requestSpin(int mode);
/// @}
/// @name DynamicPolicyMethods

View File

@ -1230,10 +1230,158 @@ void HttpRequestTestObjectType::test<11>()
}
}
template <> template <>
void HttpRequestTestObjectType::test<12>()
{
ScopedCurlInit ready;
set_test_name("HttpRequest Spin + NoOp + hard termination");
// Handler can be stack-allocated *if* there are no dangling
// references to it after completion of this method.
// Create before memory record as the string copy will bump numbers.
TestHandler2 handler(this, "handler");
// record the total amount of dynamically allocated memory
mMemTotal = GetMemTotal();
mHandlerCalls = 0;
HttpRequest * req = NULL;
try
{
// Get singletons created
HttpRequest::createService();
// Start threading early so that thread memory is invariant
// over the test.
HttpRequest::startThread();
// create a new ref counted object with an implicit reference
req = new HttpRequest();
ensure("Memory allocated on construction", mMemTotal < GetMemTotal());
// Issue a Spin
HttpHandle handle = req->requestSpin(0); // Hard spin
ensure("Valid handle returned for spin request", handle != LLCORE_HTTP_HANDLE_INVALID);
// Issue a NoOp
handle = req->requestNoOp(&handler);
ensure("Valid handle returned for no-op request", handle != LLCORE_HTTP_HANDLE_INVALID);
// Run the notification pump.
int count(0);
int limit(10);
while (count++ < limit && mHandlerCalls < 1)
{
req->update(1000);
usleep(100000);
}
ensure("No notifications received", mHandlerCalls == 0);
// release the request object
delete req;
req = NULL;
// Shut down service
HttpRequest::destroyService();
// Check memory usage
// printf("Old mem: %d, New mem: %d\n", mMemTotal, GetMemTotal());
// ensure("Memory usage back to that at entry", mMemTotal == GetMemTotal());
// This memory test won't work because we're killing the thread
// hard with the hard spinner. There's no opportunity to join
// nicely so many things leak or get destroyed unilaterally.
}
catch (...)
{
stop_thread(req);
delete req;
HttpRequest::destroyService();
throw;
}
}
template <> template <>
void HttpRequestTestObjectType::test<13>()
{
ScopedCurlInit ready;
set_test_name("HttpRequest Spin (soft) + NoOp + hard termination");
// Handler can be stack-allocated *if* there are no dangling
// references to it after completion of this method.
// Create before memory record as the string copy will bump numbers.
TestHandler2 handler(this, "handler");
// record the total amount of dynamically allocated memory
mMemTotal = GetMemTotal();
mHandlerCalls = 0;
HttpRequest * req = NULL;
try
{
// Get singletons created
HttpRequest::createService();
// Start threading early so that thread memory is invariant
// over the test.
HttpRequest::startThread();
// create a new ref counted object with an implicit reference
req = new HttpRequest();
ensure("Memory allocated on construction", mMemTotal < GetMemTotal());
// Issue a Spin
HttpHandle handle = req->requestSpin(1);
ensure("Valid handle returned for spin request", handle != LLCORE_HTTP_HANDLE_INVALID);
// Issue a NoOp
handle = req->requestNoOp(&handler);
ensure("Valid handle returned for no-op request", handle != LLCORE_HTTP_HANDLE_INVALID);
// Run the notification pump.
int count(0);
int limit(10);
while (count++ < limit && mHandlerCalls < 1)
{
req->update(1000);
usleep(100000);
}
ensure("NoOp notification received", mHandlerCalls == 1);
// release the request object
delete req;
req = NULL;
// Shut down service
HttpRequest::destroyService();
// Check memory usage
// printf("Old mem: %d, New mem: %d\n", mMemTotal, GetMemTotal());
ensure("Memory usage back to that at entry", mMemTotal == GetMemTotal());
// This memory test should work but could give problems as it
// relies on the worker thread picking up a friendly request
// to shutdown. Doing so, it drops references to things and
// we should go back to where we started. If it gives you
// problems, look into the code before commenting things out.
}
catch (...)
{
stop_thread(req);
delete req;
HttpRequest::destroyService();
throw;
}
}
// *NB: This test must be last. The sleeping webserver
// won't respond for a long time.
template <> template <>
void HttpRequestTestObjectType::test<12>()
void HttpRequestTestObjectType::test<14>()
{
ScopedCurlInit ready;
@ -1352,7 +1500,9 @@ void HttpRequestTestObjectType::test<12>()
throw;
}
}
// *NOTE: This test ^^^^^^^^ must be the last one in the set. It uses a
// sleeping service that interferes with other HTTP tests. Keep it
// last until that little HTTP server can get some attention...
} // end namespace tut