Merge with 26130/5cf2a0a7266e (FS tip).
commit
e181563a27
|
|
@ -337,11 +337,7 @@ LLMutex::~LLMutex()
|
|||
|
||||
void LLMutex::lock()
|
||||
{
|
||||
#if LL_DARWIN
|
||||
if (mLockingThread == LLThread::currentID())
|
||||
#else
|
||||
if (mLockingThread == sThreadID)
|
||||
#endif
|
||||
if(isSelfLocked())
|
||||
{ //redundant lock
|
||||
mCount++;
|
||||
return;
|
||||
|
|
@ -398,6 +394,15 @@ bool LLMutex::isLocked()
|
|||
}
|
||||
}
|
||||
|
||||
bool LLMutex::isSelfLocked()
|
||||
{
|
||||
#if LL_DARWIN
|
||||
return mLockingThread == LLThread::currentID();
|
||||
#else
|
||||
return mLockingThread == sThreadID;
|
||||
#endif
|
||||
}
|
||||
|
||||
U32 LLMutex::lockingThread() const
|
||||
{
|
||||
return mLockingThread;
|
||||
|
|
|
|||
|
|
@ -151,6 +151,7 @@ public:
|
|||
void lock(); // blocks
|
||||
void unlock();
|
||||
bool isLocked(); // non-blocking, but does do a lock/unlock so not free
|
||||
bool isSelfLocked(); //return true if locked in a same thread
|
||||
U32 lockingThread() const; //get ID of locking thread
|
||||
|
||||
protected:
|
||||
|
|
|
|||
|
|
@ -32,6 +32,9 @@
|
|||
#include "llmath.h"
|
||||
#include "llmemtype.h"
|
||||
#include "llstl.h"
|
||||
#include "llthread.h"
|
||||
|
||||
#define ASSERT_LLBUFFERARRAY_MUTEX_LOCKED llassert(!mMutexp || mMutexp->isSelfLocked());
|
||||
|
||||
/**
|
||||
* LLSegment
|
||||
|
|
@ -224,7 +227,8 @@ void LLHeapBuffer::allocate(S32 size)
|
|||
* LLBufferArray
|
||||
*/
|
||||
LLBufferArray::LLBufferArray() :
|
||||
mNextBaseChannel(0)
|
||||
mNextBaseChannel(0),
|
||||
mMutexp(NULL)
|
||||
{
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
}
|
||||
|
|
@ -233,6 +237,8 @@ LLBufferArray::~LLBufferArray()
|
|||
{
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
std::for_each(mBuffers.begin(), mBuffers.end(), DeletePointer());
|
||||
|
||||
delete mMutexp;
|
||||
}
|
||||
|
||||
// static
|
||||
|
|
@ -243,14 +249,57 @@ LLChannelDescriptors LLBufferArray::makeChannelConsumer(
|
|||
return rv;
|
||||
}
|
||||
|
||||
void LLBufferArray::lock()
|
||||
{
|
||||
if(mMutexp)
|
||||
{
|
||||
mMutexp->lock() ;
|
||||
}
|
||||
}
|
||||
|
||||
void LLBufferArray::unlock()
|
||||
{
|
||||
if(mMutexp)
|
||||
{
|
||||
mMutexp->unlock() ;
|
||||
}
|
||||
}
|
||||
|
||||
LLMutex* LLBufferArray::getMutex()
|
||||
{
|
||||
return mMutexp ;
|
||||
}
|
||||
|
||||
void LLBufferArray::setThreaded(bool threaded)
|
||||
{
|
||||
if(threaded)
|
||||
{
|
||||
if(!mMutexp)
|
||||
{
|
||||
mMutexp = new LLMutex(NULL);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if(mMutexp)
|
||||
{
|
||||
delete mMutexp ;
|
||||
mMutexp = NULL ;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LLChannelDescriptors LLBufferArray::nextChannel()
|
||||
{
|
||||
LLChannelDescriptors rv(mNextBaseChannel++);
|
||||
return rv;
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
S32 LLBufferArray::capacity() const
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
|
||||
S32 total = 0;
|
||||
const_buffer_iterator_t iter = mBuffers.begin();
|
||||
const_buffer_iterator_t end = mBuffers.end();
|
||||
|
|
@ -263,6 +312,8 @@ S32 LLBufferArray::capacity() const
|
|||
|
||||
bool LLBufferArray::append(S32 channel, const U8* src, S32 len)
|
||||
{
|
||||
LLMutexLock lock(mMutexp) ;
|
||||
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
std::vector<LLSegment> segments;
|
||||
if(copyIntoBuffers(channel, src, len, segments))
|
||||
|
|
@ -273,8 +324,11 @@ bool LLBufferArray::append(S32 channel, const U8* src, S32 len)
|
|||
return false;
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
bool LLBufferArray::prepend(S32 channel, const U8* src, S32 len)
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
std::vector<LLSegment> segments;
|
||||
if(copyIntoBuffers(channel, src, len, segments))
|
||||
|
|
@ -293,6 +347,8 @@ bool LLBufferArray::insertAfter(
|
|||
{
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
std::vector<LLSegment> segments;
|
||||
|
||||
LLMutexLock lock(mMutexp) ;
|
||||
if(mSegments.end() != segment)
|
||||
{
|
||||
++segment;
|
||||
|
|
@ -305,8 +361,11 @@ bool LLBufferArray::insertAfter(
|
|||
return false;
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address)
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
segment_iterator_t end = mSegments.end();
|
||||
segment_iterator_t it = getSegment(address);
|
||||
|
|
@ -335,20 +394,26 @@ LLBufferArray::segment_iterator_t LLBufferArray::splitAfter(U8* address)
|
|||
return rv;
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
LLBufferArray::segment_iterator_t LLBufferArray::beginSegment()
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
return mSegments.begin();
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
LLBufferArray::segment_iterator_t LLBufferArray::endSegment()
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
return mSegments.end();
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter(
|
||||
U8* address,
|
||||
LLSegment& segment)
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
segment_iterator_t rv = mSegments.begin();
|
||||
segment_iterator_t end = mSegments.end();
|
||||
|
|
@ -395,8 +460,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::constructSegmentAfter(
|
|||
return rv;
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address)
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
segment_iterator_t end = mSegments.end();
|
||||
if(!address)
|
||||
{
|
||||
|
|
@ -414,9 +481,11 @@ LLBufferArray::segment_iterator_t LLBufferArray::getSegment(U8* address)
|
|||
return end;
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
LLBufferArray::const_segment_iterator_t LLBufferArray::getSegment(
|
||||
U8* address) const
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
const_segment_iterator_t end = mSegments.end();
|
||||
if(!address)
|
||||
{
|
||||
|
|
@ -466,6 +535,8 @@ S32 LLBufferArray::countAfter(S32 channel, U8* start) const
|
|||
S32 count = 0;
|
||||
S32 offset = 0;
|
||||
const_segment_iterator_t it;
|
||||
|
||||
LLMutexLock lock(mMutexp) ;
|
||||
const_segment_iterator_t end = mSegments.end();
|
||||
if(start)
|
||||
{
|
||||
|
|
@ -517,6 +588,8 @@ U8* LLBufferArray::readAfter(
|
|||
len = 0;
|
||||
S32 bytes_to_copy = 0;
|
||||
const_segment_iterator_t it;
|
||||
|
||||
LLMutexLock lock(mMutexp) ;
|
||||
const_segment_iterator_t end = mSegments.end();
|
||||
if(start)
|
||||
{
|
||||
|
|
@ -568,6 +641,7 @@ U8* LLBufferArray::seek(
|
|||
U8* start,
|
||||
S32 delta) const
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
const_segment_iterator_t it;
|
||||
const_segment_iterator_t end = mSegments.end();
|
||||
|
|
@ -709,9 +783,14 @@ U8* LLBufferArray::seek(
|
|||
return rv;
|
||||
}
|
||||
|
||||
//test use only
|
||||
bool LLBufferArray::takeContents(LLBufferArray& source)
|
||||
{
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
|
||||
LLMutexLock lock(mMutexp);
|
||||
source.lock();
|
||||
|
||||
std::copy(
|
||||
source.mBuffers.begin(),
|
||||
source.mBuffers.end(),
|
||||
|
|
@ -723,13 +802,17 @@ bool LLBufferArray::takeContents(LLBufferArray& source)
|
|||
std::back_insert_iterator<segment_list_t>(mSegments));
|
||||
source.mSegments.clear();
|
||||
source.mNextBaseChannel = 0;
|
||||
source.unlock();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
LLBufferArray::segment_iterator_t LLBufferArray::makeSegment(
|
||||
S32 channel,
|
||||
S32 len)
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
// start at the end of the buffers, because it is the most likely
|
||||
// to have free space.
|
||||
|
|
@ -765,8 +848,10 @@ LLBufferArray::segment_iterator_t LLBufferArray::makeSegment(
|
|||
return send;
|
||||
}
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter)
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
|
||||
// Find out which buffer contains the segment, and if it is found,
|
||||
|
|
@ -792,13 +877,14 @@ bool LLBufferArray::eraseSegment(const segment_iterator_t& erase_iter)
|
|||
return rv;
|
||||
}
|
||||
|
||||
|
||||
//mMutexp should be locked before calling this.
|
||||
bool LLBufferArray::copyIntoBuffers(
|
||||
S32 channel,
|
||||
const U8* src,
|
||||
S32 len,
|
||||
std::vector<LLSegment>& segments)
|
||||
{
|
||||
ASSERT_LLBUFFERARRAY_MUTEX_LOCKED
|
||||
LLMemType m1(LLMemType::MTYPE_IO_BUFFER);
|
||||
if(!src || !len) return false;
|
||||
S32 copied = 0;
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@
|
|||
#include <list>
|
||||
#include <vector>
|
||||
|
||||
class LLMutex;
|
||||
/**
|
||||
* @class LLChannelDescriptors
|
||||
* @brief A way simple interface to accesss channels inside a buffer
|
||||
|
|
@ -564,6 +565,29 @@ public:
|
|||
* @return Returns true on success.
|
||||
*/
|
||||
bool eraseSegment(const segment_iterator_t& iter);
|
||||
|
||||
/**
|
||||
* @brief Lock the mutex if it exists
|
||||
* This method locks mMutexp to make accessing LLBufferArray thread-safe
|
||||
*/
|
||||
void lock();
|
||||
|
||||
/**
|
||||
* @brief Unlock the mutex if it exists
|
||||
*/
|
||||
void unlock();
|
||||
|
||||
/**
|
||||
* @brief Return mMutexp
|
||||
*/
|
||||
LLMutex* getMutex();
|
||||
|
||||
/**
|
||||
* @brief Set LLBufferArray to be shared across threads or not
|
||||
* This method is to create mMutexp if is threaded.
|
||||
* @param threaded Indicates this LLBufferArray instance is shared across threads if true.
|
||||
*/
|
||||
void setThreaded(bool threaded);
|
||||
//@}
|
||||
|
||||
protected:
|
||||
|
|
@ -595,6 +619,7 @@ protected:
|
|||
S32 mNextBaseChannel;
|
||||
buffer_list_t mBuffers;
|
||||
segment_list_t mSegments;
|
||||
LLMutex* mMutexp;
|
||||
};
|
||||
|
||||
#endif // LL_LLBUFFER_H
|
||||
|
|
|
|||
|
|
@ -31,6 +31,7 @@
|
|||
|
||||
#include "llbuffer.h"
|
||||
#include "llmemtype.h"
|
||||
#include "llthread.h"
|
||||
|
||||
static const S32 DEFAULT_OUTPUT_SEGMENT_SIZE = 1024 * 4;
|
||||
|
||||
|
|
@ -62,6 +63,7 @@ int LLBufferStreamBuf::underflow()
|
|||
return EOF;
|
||||
}
|
||||
|
||||
LLMutexLock lock(mBuffer->getMutex());
|
||||
LLBufferArray::segment_iterator_t iter;
|
||||
LLBufferArray::segment_iterator_t end = mBuffer->endSegment();
|
||||
U8* last_pos = (U8*)gptr();
|
||||
|
|
@ -149,6 +151,7 @@ int LLBufferStreamBuf::overflow(int c)
|
|||
// since we got here, we have a buffer, and we have a character to
|
||||
// put on it.
|
||||
LLBufferArray::segment_iterator_t it;
|
||||
LLMutexLock lock(mBuffer->getMutex());
|
||||
it = mBuffer->makeSegment(mChannels.out(), DEFAULT_OUTPUT_SEGMENT_SIZE);
|
||||
if(it != mBuffer->endSegment())
|
||||
{
|
||||
|
|
@ -210,6 +213,7 @@ int LLBufferStreamBuf::sync()
|
|||
|
||||
// *NOTE: I bet we could just --address if address is not NULL.
|
||||
// Need to think about that.
|
||||
LLMutexLock lock(mBuffer->getMutex());
|
||||
address = mBuffer->seek(mChannels.out(), address, -1);
|
||||
if(address)
|
||||
{
|
||||
|
|
@ -273,6 +277,8 @@ streampos LLBufferStreamBuf::seekoff(
|
|||
// NULL is fine
|
||||
break;
|
||||
}
|
||||
|
||||
LLMutexLock lock(mBuffer->getMutex());
|
||||
address = mBuffer->seek(mChannels.in(), base_addr, off);
|
||||
if(address)
|
||||
{
|
||||
|
|
@ -304,6 +310,8 @@ streampos LLBufferStreamBuf::seekoff(
|
|||
// NULL is fine
|
||||
break;
|
||||
}
|
||||
|
||||
LLMutexLock lock(mBuffer->getMutex());
|
||||
address = mBuffer->seek(mChannels.out(), base_addr, off);
|
||||
if(address)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -228,6 +228,8 @@ LLMutex* LLCurl::Easy::sHandleMutexp = NULL ;
|
|||
//static
|
||||
CURL* LLCurl::Easy::allocEasyHandle()
|
||||
{
|
||||
llassert(LLCurl::getCurlThread()) ;
|
||||
|
||||
CURL* ret = NULL;
|
||||
|
||||
LLMutexLock lock(sHandleMutexp) ;
|
||||
|
|
@ -500,6 +502,7 @@ void LLCurl::Easy::prepRequest(const std::string& url,
|
|||
LLProxy::getInstance()->applyProxySettings(this);
|
||||
|
||||
mOutput.reset(new LLBufferArray);
|
||||
mOutput->setThreaded(true);
|
||||
setopt(CURLOPT_WRITEFUNCTION, (void*)&curlWriteCallback);
|
||||
setopt(CURLOPT_WRITEDATA, (void*)this);
|
||||
|
||||
|
|
@ -1046,7 +1049,12 @@ LLCurl::Easy* LLCurlRequest::allocEasy()
|
|||
{
|
||||
addMulti();
|
||||
}
|
||||
llassert_always(mActiveMulti);
|
||||
if(!mActiveMulti)
|
||||
{
|
||||
return NULL ;
|
||||
}
|
||||
|
||||
//llassert_always(mActiveMulti);
|
||||
++mActiveRequestCount;
|
||||
LLCurl::Easy* easy = mActiveMulti->allocEasy();
|
||||
return easy;
|
||||
|
|
|
|||
|
|
@ -818,6 +818,8 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl(
|
|||
|
||||
// Copy everything after mLast read to the out.
|
||||
LLBufferArray::segment_iterator_t seg_iter;
|
||||
|
||||
buffer->lock();
|
||||
seg_iter = buffer->splitAfter(mLastRead);
|
||||
if(seg_iter != buffer->endSegment())
|
||||
{
|
||||
|
|
@ -838,7 +840,7 @@ LLIOPipe::EStatus LLHTTPResponder::process_impl(
|
|||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
buffer->unlock();
|
||||
//
|
||||
// *FIX: get rid of extra bytes off the end
|
||||
//
|
||||
|
|
|
|||
|
|
@ -446,6 +446,7 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(
|
|||
// efficient - not only because writev() is better, but also
|
||||
// because we won't have to do as much work to find the start
|
||||
// address.
|
||||
buffer->lock();
|
||||
LLBufferArray::segment_iterator_t it;
|
||||
LLBufferArray::segment_iterator_t end = buffer->endSegment();
|
||||
LLSegment segment;
|
||||
|
|
@ -525,6 +526,8 @@ LLIOPipe::EStatus LLIOSocketWriter::process_impl(
|
|||
}
|
||||
|
||||
}
|
||||
buffer->unlock();
|
||||
|
||||
PUMP_DEBUG;
|
||||
if(done && eos)
|
||||
{
|
||||
|
|
|
|||
|
|
@ -207,6 +207,7 @@ bool LLPumpIO::addChain(const chain_t& chain, F32 timeout, bool has_curl_request
|
|||
info.mHasCurlRequest = has_curl_request;
|
||||
info.setTimeoutSeconds(timeout);
|
||||
info.mData = LLIOPipe::buffer_ptr_t(new LLBufferArray);
|
||||
info.mData->setThreaded(has_curl_request);
|
||||
LLLinkInfo link;
|
||||
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
|
||||
lldebugs << "LLPumpIO::addChain() " << chain[0] << " '"
|
||||
|
|
|
|||
|
|
@ -111,6 +111,7 @@ public:
|
|||
* @param chain The pipes for the chain
|
||||
* @param timeout The number of seconds in the future to
|
||||
* expire. Pass in 0.0f to never expire.
|
||||
* @param has_curl_request The chain contains LLURLRequest if true.
|
||||
* @return Returns true if anything was added to the pump.
|
||||
*/
|
||||
bool addChain(const chain_t& chain, F32 timeout, bool has_curl_request = false);
|
||||
|
|
|
|||
|
|
@ -239,12 +239,17 @@ public:
|
|||
LLSDRPCClientFactory(const std::string& fixed_url) : mURL(fixed_url) {}
|
||||
virtual bool build(LLPumpIO::chain_t& chain, LLSD context) const
|
||||
{
|
||||
llerrs << "Can not call this." << llendl ;
|
||||
|
||||
lldebugs << "LLSDRPCClientFactory::build" << llendl;
|
||||
LLIOPipe::ptr_t service(new Client);
|
||||
chain.push_back(service);
|
||||
LLURLRequest* http(new LLURLRequest(LLURLRequest::HTTP_POST));
|
||||
if(!http->isValid())
|
||||
{
|
||||
llwarns << "Creating LLURLRequest failed." << llendl ;
|
||||
delete http;
|
||||
return false;
|
||||
}
|
||||
|
||||
LLIOPipe::ptr_t service(new Client);
|
||||
chain.push_back(service);
|
||||
LLIOPipe::ptr_t http_pipe(http);
|
||||
http->addHeader("Content-Type: text/llsd");
|
||||
if(mURL.empty())
|
||||
|
|
@ -284,11 +289,17 @@ public:
|
|||
LLXMLSDRPCClientFactory(const std::string& fixed_url) : mURL(fixed_url) {}
|
||||
virtual bool build(LLPumpIO::chain_t& chain, LLSD context) const
|
||||
{
|
||||
llerrs << "who calls this?" << llendl ;
|
||||
lldebugs << "LLXMLSDRPCClientFactory::build" << llendl;
|
||||
LLIOPipe::ptr_t service(new Client);
|
||||
chain.push_back(service);
|
||||
|
||||
LLURLRequest* http(new LLURLRequest(LLURLRequest::HTTP_POST));
|
||||
if(!http->isValid())
|
||||
{
|
||||
llwarns << "Creating LLURLRequest failed." << llendl ;
|
||||
delete http;
|
||||
return false ;
|
||||
}
|
||||
LLIOPipe::ptr_t service(new Client);
|
||||
chain.push_back(service);
|
||||
LLIOPipe::ptr_t http_pipe(http);
|
||||
http->addHeader("Content-Type: text/xml");
|
||||
if(mURL.empty())
|
||||
|
|
|
|||
|
|
@ -476,16 +476,10 @@ void LLURLRequest::initialize()
|
|||
LLMemType m1(LLMemType::MTYPE_IO_URL_REQUEST);
|
||||
mState = STATE_INITIALIZED;
|
||||
mDetail = new LLURLRequestDetail;
|
||||
if(!mDetail)
|
||||
{
|
||||
lldebugs << "LLURLRequestDetail() failed." << llendl;
|
||||
return;
|
||||
}
|
||||
|
||||
if(!mDetail->mCurlRequest)
|
||||
if(!isValid())
|
||||
{
|
||||
lldebugs << "mCurlRequest==0!" << llendl;
|
||||
return;
|
||||
return ;
|
||||
}
|
||||
|
||||
mDetail->mCurlRequest->setopt(CURLOPT_NOSIGNAL, 1);
|
||||
|
|
|
|||
Loading…
Reference in New Issue