Result of svn merge -r74235:74242 svn+ssh://svn/svn/linden/branches/robust-pump into release

master
Aaron Brashears 2007-12-05 01:15:45 +00:00
parent f8511d77a7
commit 2a9be0445b
9 changed files with 342 additions and 53 deletions

View File

@ -51,6 +51,7 @@ static const std::string STATUS_ERROR_NAMES[LLIOPipe::STATUS_ERROR_COUNT] =
std::string("STATUS_NOT_IMPLEMENTED"),
std::string("STATUS_PRECONDITION_NOT_MET"),
std::string("STATUS_NO_CONNECTION"),
std::string("STATUS_LOST_CONNECTION"),
std::string("STATUS_EXPIRED"),
};

View File

@ -148,11 +148,14 @@ public:
// This means we could not connect to a remote host.
STATUS_NO_CONNECTION = -4,
// This means we could not connect to a remote host.
STATUS_EXPIRED = -5,
// The connection was lost.
STATUS_LOST_CONNECTION = -5,
// The totoal process time has exceeded the timeout.
STATUS_EXPIRED = -6,
// Keep track of the count of codes here.
STATUS_ERROR_COUNT = 5,
STATUS_ERROR_COUNT = 6,
};
/**

View File

@ -64,6 +64,40 @@ bool is_addr_in_use(apr_status_t status)
#endif
}
#if LL_LINUX
// Define this to see the actual file descriptors being tossed around.
//#define LL_DEBUG_SOCKET_FILE_DESCRIPTORS 1
#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
#include "apr-1/apr_portable.h"
#endif
#endif
// Quick function
void ll_debug_socket(const char* msg, apr_socket_t* apr_sock)
{
#if LL_DEBUG_SOCKET_FILE_DESCRIPTORS
if(!apr_sock)
{
lldebugs << "Socket -- " << (msg?msg:"") << ": no socket." << llendl;
return;
}
// *TODO: Why doesn't this work?
//apr_os_sock_t os_sock;
int os_sock;
if(APR_SUCCESS == apr_os_sock_get(&os_sock, apr_sock))
{
lldebugs << "Socket -- " << (msg?msg:"") << " on fd " << os_sock
<< " at " << apr_sock << llendl;
}
else
{
lldebugs << "Socket -- " << (msg?msg:"") << " no fd "
<< " at " << apr_sock << llendl;
}
#endif
}
///
/// LLSocket
///
@ -199,6 +233,7 @@ bool LLSocket::blockingConnect(const LLHost& host)
return false;
}
apr_socket_timeout_set(mSocket, 1000);
ll_debug_socket("Blocking connect", mSocket);
if(ll_apr_warn_status(apr_socket_connect(mSocket, sa))) return false;
setOptions();
return true;
@ -209,6 +244,7 @@ LLSocket::LLSocket(apr_socket_t* socket, apr_pool_t* pool) :
mPool(pool),
mPort(PORT_INVALID)
{
ll_debug_socket("Constructing wholely formed socket", mSocket);
LLMemType m1(LLMemType::MTYPE_IO_TCP);
}
@ -216,9 +252,9 @@ LLSocket::~LLSocket()
{
LLMemType m1(LLMemType::MTYPE_IO_TCP);
// *FIX: clean up memory we are holding.
//lldebugs << "Destroying LLSocket" << llendl;
if(mSocket)
{
ll_debug_socket("Destroying socket", mSocket);
apr_socket_close(mSocket);
}
if(mPool)

View File

@ -34,6 +34,7 @@
#include "linden_common.h"
#include "llpumpio.h"
#include <map>
#include <set>
#include "apr-1/apr_poll.h"
@ -41,10 +42,15 @@
#include "llmemtype.h"
#include "llstl.h"
// This should not be in production, but it is intensely useful during
// development.
// These should not be enabled in production, but they can be
// intensely useful during development for finding certain kinds of
// bugs.
#if LL_LINUX
#define LL_DEBUG_PIPE_TYPE_IN_PUMP 0
//#define LL_DEBUG_PIPE_TYPE_IN_PUMP 1
//#define LL_DEBUG_POLL_FILE_DESCRIPTORS 1
#if LL_DEBUG_POLL_FILE_DESCRIPTORS
#include "apr-1/apr_portable.h"
#endif
#endif
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
@ -73,6 +79,52 @@ extern const F32 NEVER_CHAIN_EXPIRY_SECS = 0.0f;
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_IN 1
//#define LL_DEBUG_SPEW_BUFFER_CHANNEL_OUT 1
//
// local functions
//
void ll_debug_poll_fd(const char* msg, const apr_pollfd_t* poll)
{
#if LL_DEBUG_POLL_FILE_DESCRIPTORS
if(!poll)
{
lldebugs << "Poll -- " << (msg?msg:"") << ": no pollfd." << llendl;
return;
}
if(poll->desc.s)
{
apr_os_sock_t os_sock;
if(APR_SUCCESS == apr_os_sock_get(&os_sock, poll->desc.s))
{
lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_sock
<< " at " << poll->desc.s << llendl;
}
else
{
lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
<< " at " << poll->desc.s << llendl;
}
}
else if(poll->desc.f)
{
apr_os_file_t os_file;
if(APR_SUCCESS == apr_os_file_get(&os_file, poll->desc.f))
{
lldebugs << "Poll -- " << (msg?msg:"") << " on fd " << os_file
<< " at " << poll->desc.f << llendl;
}
else
{
lldebugs << "Poll -- " << (msg?msg:"") << " no fd "
<< " at " << poll->desc.f << llendl;
}
}
else
{
lldebugs << "Poll -- " << (msg?msg:"") << ": no descriptor." << llendl;
}
#endif
}
/**
* @class
*/
@ -217,50 +269,88 @@ bool LLPumpIO::setTimeoutSeconds(F32 timeout)
return true;
}
static std::string events_2_string(apr_int16_t events)
{
std::ostringstream ostr;
if(events & APR_POLLIN)
{
ostr << "read,";
}
if(events & APR_POLLPRI)
{
ostr << "priority,";
}
if(events & APR_POLLOUT)
{
ostr << "write,";
}
if(events & APR_POLLERR)
{
ostr << "error,";
}
if(events & APR_POLLHUP)
{
ostr << "hangup,";
}
if(events & APR_POLLNVAL)
{
ostr << "invalid,";
}
return chop_tail_copy(ostr.str(), 1);
}
bool LLPumpIO::setConditional(LLIOPipe* pipe, const apr_pollfd_t* poll)
{
LLMemType m1(LLMemType::MTYPE_IO_PUMP);
//lldebugs << "LLPumpIO::setConditional" << llendl;
if(pipe)
{
// remove any matching poll file descriptors for this pipe.
LLIOPipe::ptr_t pipe_ptr(pipe);
LLChainInfo::conditionals_t::iterator it;
it = (*mCurrentChain).mDescriptors.begin();
while(it != (*mCurrentChain).mDescriptors.end())
{
LLChainInfo::pipe_conditional_t& value = (*it);
if(pipe_ptr == value.first)
{
ll_delete_apr_pollset_fd_client_data()(value);
it = (*mCurrentChain).mDescriptors.erase(it);
mRebuildPollset = true;
}
else
{
++it;
}
}
if(!pipe) return false;
ll_debug_poll_fd("Set conditional", poll);
if(poll)
lldebugs << "Setting conditionals (" << events_2_string(poll->reqevents)
<< ") "
#if LL_DEBUG_PIPE_TYPE_IN_PUMP
<< "on pipe " << typeid(*pipe).name()
#endif
<< " at " << pipe << llendl;
// remove any matching poll file descriptors for this pipe.
LLIOPipe::ptr_t pipe_ptr(pipe);
LLChainInfo::conditionals_t::iterator it;
it = (*mCurrentChain).mDescriptors.begin();
while(it != (*mCurrentChain).mDescriptors.end())
{
LLChainInfo::pipe_conditional_t& value = (*it);
if(pipe_ptr == value.first)
{
LLChainInfo::pipe_conditional_t value;
value.first = pipe_ptr;
value.second = *poll;
if(!poll->p)
{
// each fd needs a pool to work with, so if one was
// not specified, use this pool.
// *FIX: Should it always be this pool?
value.second.p = mPool;
}
value.second.client_data = new S32(++mPollsetClientID);
(*mCurrentChain).mDescriptors.push_back(value);
ll_delete_apr_pollset_fd_client_data()(value);
it = (*mCurrentChain).mDescriptors.erase(it);
mRebuildPollset = true;
}
else
{
++it;
}
}
if(!poll)
{
mRebuildPollset = true;
return true;
}
return false;
LLChainInfo::pipe_conditional_t value;
value.first = pipe_ptr;
value.second = *poll;
value.second.rtnevents = 0;
if(!poll->p)
{
// each fd needs a pool to work with, so if one was
// not specified, use this pool.
// *FIX: Should it always be this pool?
value.second.p = mPool;
}
value.second.client_data = new S32(++mPollsetClientID);
(*mCurrentChain).mDescriptors.push_back(value);
mRebuildPollset = true;
return true;
}
S32 LLPumpIO::setLock()
@ -412,24 +502,25 @@ void LLPumpIO::pump(const S32& poll_timeout)
}
// Poll based on the last known pollset
// *FIX: may want to pass in a poll timeout so it works correctly
// *TODO: may want to pass in a poll timeout so it works correctly
// in single and multi threaded processes.
PUMP_DEBUG;
typedef std::set<S32> signal_client_t;
typedef std::map<S32, S32> signal_client_t;
signal_client_t signalled_client;
const apr_pollfd_t* poll_fd = NULL;
if(mPollset)
{
PUMP_DEBUG;
//llinfos << "polling" << llendl;
S32 count = 0;
S32 client_id = 0;
const apr_pollfd_t* poll_fd = NULL;
apr_pollset_poll(mPollset, poll_timeout, &count, &poll_fd);
PUMP_DEBUG;
for(S32 i = 0; i < count; ++i)
for(S32 ii = 0; ii < count; ++ii)
{
client_id = *((S32*)poll_fd[i].client_data);
signalled_client.insert(client_id);
ll_debug_poll_fd("Signalled pipe", &poll_fd[ii]);
client_id = *((S32*)poll_fd[ii].client_data);
signalled_client[client_id] = ii;
}
PUMP_DEBUG;
}
@ -515,16 +606,49 @@ void LLPumpIO::pump(const S32& poll_timeout)
LLChainInfo::conditionals_t::iterator end;
end = (*run_chain).mDescriptors.end();
S32 client_id = 0;
signal_client_t::iterator signal;
for(; it != end; ++it)
{
PUMP_DEBUG;
client_id = *((S32*)((*it).second.client_data));
if(signalled_client.find(client_id) != not_signalled)
signal = signalled_client.find(client_id);
if (signal == not_signalled) continue;
static const apr_int16_t POLL_CHAIN_ERROR =
APR_POLLHUP | APR_POLLNVAL | APR_POLLERR;
const apr_pollfd_t* poll = &(poll_fd[(*signal).second]);
if(poll->rtnevents & POLL_CHAIN_ERROR)
{
process_this_chain = true;
// Potential eror condition has been
// returned. If HUP was one of them, we pass
// that as the error even though there may be
// more. If there are in fact more errors,
// we'll just wait for that detection until
// the next pump() cycle to catch it so that
// the logic here gets no more strained than
// it already is.
LLIOPipe::EStatus error_status;
if(poll->rtnevents & APR_POLLHUP)
error_status = LLIOPipe::STATUS_LOST_CONNECTION;
else
error_status = LLIOPipe::STATUS_ERROR;
if(handleChainError(*run_chain, error_status)) break;
ll_debug_poll_fd("Removing pipe", poll);
llwarns << "Removing pipe "
<< (*run_chain).mChainLinks[0].mPipe
<< " '"
<< typeid(
*((*run_chain).mChainLinks[0].mPipe)).name()
<< "' because: "
<< events_2_string(poll->rtnevents)
<< llendl;
(*run_chain).mHead = (*run_chain).mChainLinks.end();
break;
}
//llinfos << "no fd ready for this one." << llendl;
// at least 1 fd got signalled, and there were no
// errors. That means we process this chain.
process_this_chain = true;
break;
}
}
}

View File

@ -424,6 +424,20 @@ protected:
* @return Retuns true if someone handled the error
*/
bool handleChainError(LLChainInfo& chain, LLIOPipe::EStatus error);
public:
/**
* @brief Return number of running chains.
*
* *NOTE: This is only used in debugging and not considered
* efficient or safe enough for production use.
*/
running_chains_t::size_type runningChains() const
{
return mRunningChains.size();
}
};

View File

@ -1080,7 +1080,7 @@ namespace tut
mPool,
mSocket,
factory);
server->setResponseTimeout(SHORT_CHAIN_EXPIRY_SECS + 2.0f);
server->setResponseTimeout(SHORT_CHAIN_EXPIRY_SECS + 1.80f);
chain.push_back(LLIOPipe::ptr_t(server));
mPump->addChain(chain, NEVER_CHAIN_EXPIRY_SECS);
@ -1108,6 +1108,68 @@ namespace tut
F32 elapsed = pump_loop(mPump, SHORT_CHAIN_EXPIRY_SECS + 3.0f);
ensure("Did not take too long", (elapsed < DEFAULT_CHAIN_EXPIRY_SECS));
}
template<> template<>
void fitness_test_object::test<5>()
{
// Set up the server
LLPumpIO::chain_t chain;
typedef LLCloneIOFactory<LLIOSleeper> sleeper_t;
sleeper_t* sleeper = new sleeper_t(new LLIOSleeper);
boost::shared_ptr<LLChainIOFactory> factory(sleeper);
LLIOServerSocket* server = new LLIOServerSocket(
mPool,
mSocket,
factory);
server->setResponseTimeout(1.0);
chain.push_back(LLIOPipe::ptr_t(server));
mPump->addChain(chain, NEVER_CHAIN_EXPIRY_SECS);
// We need to tickle the pump a little to set up the listen()
pump_loop(mPump, 0.1f);
U32 count = mPump->runningChains();
ensure_equals("server chain onboard", count, 1);
lldebugs << "** Server is up." << llendl;
// Set up the client
LLSocket::ptr_t client = LLSocket::create(mPool, LLSocket::STREAM_TCP);
LLHost server_host("127.0.0.1", SERVER_LISTEN_PORT);
bool connected = client->blockingConnect(server_host);
ensure("Connected to server", connected);
lldebugs << "connected" << llendl;
F32 elapsed = pump_loop(mPump,0.1f);
count = mPump->runningChains();
ensure_equals("server chain onboard", count, 2);
lldebugs << "** Client is connected." << llendl;
// We have connected, since the socket reader does not block,
// the first call to read data will return EAGAIN, so we need
// to write something.
chain.clear();
chain.push_back(LLIOPipe::ptr_t(new LLPipeStringInjector("hi")));
chain.push_back(LLIOPipe::ptr_t(new LLIOSocketWriter(client)));
chain.push_back(LLIOPipe::ptr_t(new LLIONull));
mPump->addChain(chain, 0.2);
chain.clear();
// pump for a bit and make sure all 3 chains are running
elapsed = pump_loop(mPump,0.1f);
count = mPump->runningChains();
ensure_equals("client chain onboard", count, 3);
lldebugs << "** request should have been sent." << llendl;
// pump for long enough the the client socket closes, and the
// server socket should not be closed yet.
elapsed = pump_loop(mPump,0.2f);
count = mPump->runningChains();
ensure_equals("client chain timed out ", count, 2);
lldebugs << "** client chain should be closed." << llendl;
// At this point, the socket should be closed by the timeout
elapsed = pump_loop(mPump,1.0f);
count = mPump->runningChains();
ensure_equals("accepted socked close", count, 1);
lldebugs << "** Sleeper should have timed out.." << llendl;
}
}
namespace tut

View File

@ -164,3 +164,25 @@ LLIOPipe::EStatus LLIONull::process_impl(
{
return STATUS_OK;
}
// virtual
LLIOPipe::EStatus LLIOSleeper::process_impl(
const LLChannelDescriptors& channels,
buffer_ptr_t& buffer,
bool& eos,
LLSD& context,
LLPumpIO* pump)
{
if(!mRespond)
{
lldebugs << "LLIOSleeper::process_impl() sleeping." << llendl;
mRespond = true;
static const F64 SLEEP_TIME = 2.0;
pump->sleepChain(SLEEP_TIME);
return STATUS_BREAK;
}
lldebugs << "LLIOSleeper::process_impl() responding." << llendl;
LLBufferStream ostr(channels, buffer.get());
ostr << "huh? sorry, I was sleeping." << std::endl;
return STATUS_DONE;
}

View File

@ -145,4 +145,24 @@ protected:
LLPumpIO* pump);
};
/**
* @brief Pipe that sleeps, and then responds later.
*/
class LLIOSleeper : public LLIOPipe
{
public:
LLIOSleeper() : mRespond(false) {}
protected:
virtual EStatus process_impl(
const LLChannelDescriptors& channels,
buffer_ptr_t& buffer,
bool& eos,
LLSD& context,
LLPumpIO* pump);
private:
bool mRespond;
};
#endif // LL_LLPIPEUTIL_H

View File

@ -170,6 +170,7 @@ static const apr_getopt_option_t TEST_CL_OPTIONS[] =
{"group", 'g', 1, "Run test group specified by option argument."},
{"skip", 's', 1, "Skip test number specified by option argument. Only works when a specific group is being tested"},
{"wait", 'w', 0, "Wait for input before exit."},
{"debug", 'd', 0, "Emit full debug logs."},
{0, 0, 0, 0}
};
@ -224,7 +225,8 @@ int main(int argc, char **argv)
LLError::initForApplication(".");
LLError::setFatalFunction(wouldHaveCrashed);
LLError::setDefaultLevel(LLError::LEVEL_ERROR);
// *FIX: should come from error config file
//< *TODO: should come from error config file. Note that we
// have a command line option that sets this to debug.
#ifdef CTYPE_WORKAROUND
ctype_workaround();
@ -286,6 +288,11 @@ int main(int argc, char **argv)
case 'w':
wait_at_exit = true;
break;
case 'd':
// *TODO: should come from error config file. We set it to
// ERROR by default, so this allows full debug levels.
LLError::setDefaultLevel(LLError::LEVEL_DEBUG);
break;
default:
stream_usage(std::cerr, argv[0]);
return 1;