Merged in nat_linden/viewer-neko (pull request #262)

Add LLEventBatch, LLEventThrottle, LLEventBatchThrottle classes.

Approved-by: Rider Linden <rider@lindenlab.com>
Approved-by: Andrey Kleshchev <andreykproductengine@lindenlab.com>
master
nat_linden 2017-05-17 14:53:00 +00:00
commit 6233797388
5 changed files with 548 additions and 8 deletions

View File

@ -324,26 +324,27 @@ if (LL_TESTS)
LL_ADD_INTEGRATION_TEST(lldeadmantimer "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lldependencies "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llerror "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lleventdispatcher "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lleventcoro "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lleventfilter "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llframetimer "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llheteromap "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llinstancetracker "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llleap "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llpounceable "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llprocess "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llprocessor "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llprocinfo "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llrand "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llsdserialize "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llsingleton "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llstreamqueue "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llstring "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lltrace "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lltreeiterators "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lluri "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llunits "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(stringize "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lleventdispatcher "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(lleventcoro "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llprocess "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llleap "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llstreamqueue "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llpounceable "" "${test_libs}")
LL_ADD_INTEGRATION_TEST(llheteromap "" "${test_libs}")
## llexception_test.cpp isn't a regression test, and doesn't need to be run
## every build. It's to help a developer make implementation choices about

View File

@ -38,12 +38,18 @@
#include "llerror.h" // LL_ERRS
#include "llsdutil.h" // llsd_matches()
/*****************************************************************************
* LLEventFilter
*****************************************************************************/
LLEventFilter::LLEventFilter(LLEventPump& source, const std::string& name, bool tweak):
LLEventStream(name, tweak),
mSource(source.listen(getName(), boost::bind(&LLEventFilter::post, this, _1)))
{
}
/*****************************************************************************
* LLEventMatching
*****************************************************************************/
LLEventMatching::LLEventMatching(const LLSD& pattern):
LLEventFilter("matching"),
mPattern(pattern)
@ -64,6 +70,9 @@ bool LLEventMatching::post(const LLSD& event)
return LLEventStream::post(event);
}
/*****************************************************************************
* LLEventTimeoutBase
*****************************************************************************/
LLEventTimeoutBase::LLEventTimeoutBase():
LLEventFilter("timeout")
{
@ -148,6 +157,14 @@ bool LLEventTimeoutBase::tick(const LLSD&)
return false; // show event to other listeners
}
bool LLEventTimeoutBase::running() const
{
return mMainloop.connected();
}
/*****************************************************************************
* LLEventTimeout
*****************************************************************************/
LLEventTimeout::LLEventTimeout() {}
LLEventTimeout::LLEventTimeout(LLEventPump& source):
@ -164,3 +181,231 @@ bool LLEventTimeout::countdownElapsed() const
{
return mTimer.hasExpired();
}
/*****************************************************************************
* LLEventBatch
*****************************************************************************/
LLEventBatch::LLEventBatch(std::size_t size):
LLEventFilter("batch"),
mBatchSize(size)
{}
LLEventBatch::LLEventBatch(LLEventPump& source, std::size_t size):
LLEventFilter(source, "batch"),
mBatchSize(size)
{}
void LLEventBatch::flush()
{
// copy and clear mBatch BEFORE posting to avoid weird circularity effects
LLSD batch(mBatch);
mBatch.clear();
LLEventStream::post(batch);
}
bool LLEventBatch::post(const LLSD& event)
{
mBatch.append(event);
// calling setSize(same) performs the very check we want
setSize(mBatchSize);
return false;
}
void LLEventBatch::setSize(std::size_t size)
{
mBatchSize = size;
// changing the size might mean that we have to flush NOW
if (mBatch.size() >= mBatchSize)
{
flush();
}
}
/*****************************************************************************
* LLEventThrottleBase
*****************************************************************************/
LLEventThrottleBase::LLEventThrottleBase(F32 interval):
LLEventFilter("throttle"),
mInterval(interval),
mPosts(0)
{}
LLEventThrottleBase::LLEventThrottleBase(LLEventPump& source, F32 interval):
LLEventFilter(source, "throttle"),
mInterval(interval),
mPosts(0)
{}
void LLEventThrottleBase::flush()
{
// flush() is a no-op unless there's something pending.
// Don't test mPending because there's no requirement that the consumer
// post() anything but an isUndefined(). This is what mPosts is for.
if (mPosts)
{
mPosts = 0;
alarmCancel();
// This is not to set our alarm; we are not yet requesting
// any notification. This is just to track whether subsequent post()
// calls fall within this mInterval or not.
timerSet(mInterval);
// copy and clear mPending BEFORE posting to avoid weird circularity
// effects
LLSD pending = mPending;
mPending.clear();
LLEventStream::post(pending);
}
}
LLSD LLEventThrottleBase::pending() const
{
return mPending;
}
bool LLEventThrottleBase::post(const LLSD& event)
{
// Always capture most recent post() event data. If caller wants to
// aggregate multiple events, let them retrieve pending() and modify
// before calling post().
mPending = event;
// Always increment mPosts. Unless we count this call, flush() does
// nothing.
++mPosts;
// We reset mTimer on every flush() call to let us know if we're still
// within the same mInterval. So -- are we?
F32 timeRemaining = timerGetRemaining();
if (! timeRemaining)
{
// more than enough time has elapsed, immediately flush()
flush();
}
else
{
// still within mInterval of the last flush() call: have to defer
if (! alarmRunning())
{
// timeRemaining tells us how much longer it will be until
// mInterval seconds since the last flush() call. At that time,
// flush() deferred events.
alarmActionAfter(timeRemaining, boost::bind(&LLEventThrottleBase::flush, this));
}
}
return false;
}
void LLEventThrottleBase::setInterval(F32 interval)
{
F32 oldInterval = mInterval;
mInterval = interval;
// If we are not now within oldInterval of the last flush(), we're done:
// this will only affect behavior starting with the next flush().
F32 timeRemaining = timerGetRemaining();
if (timeRemaining)
{
// We are currently within oldInterval of the last flush(). Figure out
// how much time remains until (the new) mInterval of the last
// flush(). Bt we don't actually store a timestamp for the last
// flush(); it's implicit. There are timeRemaining seconds until what
// used to be the end of the interval. Move that endpoint by the
// difference between the new interval and the old.
timeRemaining += (mInterval - oldInterval);
// If we're called with a larger interval, the difference is positive
// and timeRemaining increases.
// If we're called with a smaller interval, the difference is negative
// and timeRemaining decreases. The interesting case is when it goes
// nonpositive: when the new interval means we can flush immediately.
if (timeRemaining <= 0.0f)
{
flush();
}
else
{
// immediately reset mTimer
timerSet(timeRemaining);
// and if mAlarm is running, reset that too
if (alarmRunning())
{
alarmActionAfter(timeRemaining, boost::bind(&LLEventThrottleBase::flush, this));
}
}
}
}
F32 LLEventThrottleBase::getDelay() const
{
return timerGetRemaining();
}
/*****************************************************************************
* LLEventThrottle implementation
*****************************************************************************/
LLEventThrottle::LLEventThrottle(F32 interval):
LLEventThrottleBase(interval)
{}
LLEventThrottle::LLEventThrottle(LLEventPump& source, F32 interval):
LLEventThrottleBase(source, interval)
{}
void LLEventThrottle::alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action)
{
mAlarm.actionAfter(interval, action);
}
bool LLEventThrottle::alarmRunning() const
{
return mAlarm.running();
}
void LLEventThrottle::alarmCancel()
{
return mAlarm.cancel();
}
void LLEventThrottle::timerSet(F32 interval)
{
mTimer.setTimerExpirySec(interval);
}
F32 LLEventThrottle::timerGetRemaining() const
{
return mTimer.getRemainingTimeF32();
}
/*****************************************************************************
* LLEventBatchThrottle
*****************************************************************************/
LLEventBatchThrottle::LLEventBatchThrottle(F32 interval, std::size_t size):
LLEventThrottle(interval),
mBatchSize(size)
{}
LLEventBatchThrottle::LLEventBatchThrottle(LLEventPump& source, F32 interval, std::size_t size):
LLEventThrottle(source, interval),
mBatchSize(size)
{}
bool LLEventBatchThrottle::post(const LLSD& event)
{
// simply retrieve pending value and append the new event to it
LLSD partial = pending();
partial.append(event);
bool ret = LLEventThrottle::post(partial);
// The post() call above MIGHT have called flush() already. If it did,
// then pending() was reset to empty. If it did not, though, but the batch
// size has grown to the limit, flush() anyway. If there's a limit at all,
// of course. Calling setSize(same) performs the very check we want.
setSize(mBatchSize);
return ret;
}
void LLEventBatchThrottle::setSize(std::size_t size)
{
mBatchSize = size;
// Changing the size might mean that we have to flush NOW. Don't forget
// that 0 means unlimited.
if (mBatchSize && pending().size() >= mBatchSize)
{
flush();
}
}

View File

@ -177,6 +177,9 @@ public:
/// Cancel timer without event
void cancel();
/// Is this timer currently running?
bool running() const;
protected:
virtual void setCountdown(F32 seconds) = 0;
virtual bool countdownElapsed() const = 0;
@ -215,4 +218,162 @@ private:
LLTimer mTimer;
};
/**
* LLEventBatch: accumulate post() events (LLSD blobs) into an LLSD Array
* until the array reaches a certain size, then call listeners with the Array
* and clear it back to empty.
*/
class LL_COMMON_API LLEventBatch: public LLEventFilter
{
public:
// pass batch size
LLEventBatch(std::size_t size);
// construct and connect
LLEventBatch(LLEventPump& source, std::size_t size);
// force out the pending batch
void flush();
// accumulate an event and flush() when big enough
virtual bool post(const LLSD& event);
// query or reset batch size
std::size_t getSize() const { return mBatchSize; }
void setSize(std::size_t size);
private:
LLSD mBatch;
std::size_t mBatchSize;
};
/**
* LLEventThrottleBase: construct with a time interval. Regardless of how
* frequently you call post(), LLEventThrottle will pass on an event to
* its listeners no more often than once per specified interval.
*
* A new event after more than the specified interval will immediately be
* passed along to listeners. But subsequent events will be delayed until at
* least one time interval since listeners were last called. Consider the
* sequence below. Suppose we have an LLEventThrottle constructed with an
* interval of 3 seconds. The numbers on the left are timestamps in seconds
* relative to an arbitrary reference point.
*
* 1: post(): event immediately passed to listeners, next no sooner than 4
* 2: post(): deferred: waiting for 3 seconds to elapse
* 3: post(): deferred
* 4: no post() call, but event delivered to listeners; next no sooner than 7
* 6: post(): deferred
* 7: no post() call, but event delivered; next no sooner than 10
* 12: post(): immediately passed to listeners, next no sooner than 15
* 17: post(): immediately passed to listeners, next no sooner than 20
*
* For a deferred event, the LLSD blob delivered to listeners is from the most
* recent deferred post() call. However, a sender may obtain the previous
* event blob by calling pending(), modifying it as desired and post()ing the
* new value. (See LLEventBatchThrottle.) Each time an event is delivered to
* listeners, the pending() value is reset to isUndefined().
*
* You may also call flush() to immediately pass along any deferred events to
* all listeners.
*
* @NOTE This is an abstract base class so that, for testing, we can use an
* alternate "timer" that doesn't actually consume real time. See
* LLEventThrottle.
*/
class LL_COMMON_API LLEventThrottleBase: public LLEventFilter
{
public:
// pass time interval
LLEventThrottleBase(F32 interval);
// construct and connect
LLEventThrottleBase(LLEventPump& source, F32 interval);
// force out any deferred events
void flush();
// retrieve (aggregate) deferred event since last event sent to listeners
LLSD pending() const;
// register an event, may be either passed through or deferred
virtual bool post(const LLSD& event);
// query or reset interval
F32 getInterval() const { return mInterval; }
void setInterval(F32 interval);
// deferred posts
std::size_t getPostCount() const { return mPosts; }
// time until next event would be passed through, 0.0 if now
F32 getDelay() const;
protected:
// Implement these time-related methods for a valid LLEventThrottleBase
// subclass (see LLEventThrottle). For testing, we use a subclass that
// doesn't involve actual elapsed time.
virtual void alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action) = 0;
virtual bool alarmRunning() const = 0;
virtual void alarmCancel() = 0;
virtual void timerSet(F32 interval) = 0;
virtual F32 timerGetRemaining() const = 0;
private:
// remember throttle interval
F32 mInterval;
// count post() calls since last flush()
std::size_t mPosts;
// pending event data from most recent deferred event
LLSD mPending;
};
/**
* Production implementation of LLEventThrottle.
*/
class LLEventThrottle: public LLEventThrottleBase
{
public:
LLEventThrottle(F32 interval);
LLEventThrottle(LLEventPump& source, F32 interval);
private:
virtual void alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action) override;
virtual bool alarmRunning() const override;
virtual void alarmCancel() override;
virtual void timerSet(F32 interval) override;
virtual F32 timerGetRemaining() const override;
// use this to arrange a deferred flush() call
LLEventTimeout mAlarm;
// use this to track whether we're within mInterval of last flush()
LLTimer mTimer;
};
/**
* LLEventBatchThrottle: like LLEventThrottle, it's reluctant to pass events
* to listeners more often than once per specified time interval -- but only
* reluctant, since exceeding the specified batch size limit can cause it to
* deliver accumulated events sooner. Like LLEventBatch, it accumulates
* pending events into an LLSD Array, optionally flushing when the batch grows
* to a certain size.
*/
class LLEventBatchThrottle: public LLEventThrottle
{
public:
// pass time interval and (optionally) max batch size; 0 means batch can
// grow arbitrarily large
LLEventBatchThrottle(F32 interval, std::size_t size = 0);
// construct and connect
LLEventBatchThrottle(LLEventPump& source, F32 interval, std::size_t size = 0);
// append a new event to current batch
virtual bool post(const LLSD& event);
// query or reset batch size
std::size_t getSize() const { return mBatchSize; }
void setSize(std::size_t size);
private:
std::size_t mBatchSize;
};
#endif /* ! defined(LL_LLEVENTFILTER_H) */

View File

@ -138,4 +138,15 @@ struct Collect
StringVec result;
};
struct Concat
{
bool operator()(const LLSD& event)
{
result += event.asString();
return false;
}
void clear() { result.clear(); }
std::string result;
};
#endif /* ! defined(LL_LISTENER_H) */

View File

@ -70,6 +70,85 @@ private:
bool mElapsed;
};
// Similar remarks about LLEventThrottle: we're actually testing the logic in
// LLEventThrottleBase, dummying out the LLTimer and LLEventTimeout used by
// the production LLEventThrottle class.
class TestEventThrottle: public LLEventThrottleBase
{
public:
TestEventThrottle(F32 interval):
LLEventThrottleBase(interval),
mAlarmRemaining(-1),
mTimerRemaining(-1)
{}
TestEventThrottle(LLEventPump& source, F32 interval):
LLEventThrottleBase(source, interval),
mAlarmRemaining(-1),
mTimerRemaining(-1)
{}
/*----- implementation of LLEventThrottleBase timing functionality -----*/
virtual void alarmActionAfter(F32 interval, const LLEventTimeoutBase::Action& action) override
{
mAlarmRemaining = interval;
mAlarmAction = action;
}
virtual bool alarmRunning() const override
{
// decrementing to exactly 0 should mean the alarm fires
return mAlarmRemaining > 0;
}
virtual void alarmCancel() override
{
mAlarmRemaining = -1;
}
virtual void timerSet(F32 interval) override
{
mTimerRemaining = interval;
}
virtual F32 timerGetRemaining() const override
{
// LLTimer.getRemainingTimeF32() never returns negative; 0.0 means expired
return (mTimerRemaining > 0.0)? mTimerRemaining : 0.0;
}
/*------------------- methods for manipulating time --------------------*/
void alarmAdvance(F32 delta)
{
bool wasRunning = alarmRunning();
mAlarmRemaining -= delta;
if (wasRunning && ! alarmRunning())
{
mAlarmAction();
}
}
void timerAdvance(F32 delta)
{
// This simple implementation, like alarmAdvance(), completely ignores
// HOW negative mTimerRemaining might go. All that matters is whether
// it's negative. We trust that no test method in this source will
// drive it beyond the capacity of an F32. Seems like a safe assumption.
mTimerRemaining -= delta;
}
void advance(F32 delta)
{
// Advance the timer first because it has no side effects.
// alarmAdvance() might call flush(), which will need to see the
// change in the timer.
timerAdvance(delta);
alarmAdvance(delta);
}
F32 mAlarmRemaining, mTimerRemaining;
LLEventTimeoutBase::Action mAlarmAction;
};
/*****************************************************************************
* TUT
*****************************************************************************/
@ -116,7 +195,9 @@ namespace tut
listener0.listenTo(driver));
// Construct a pattern LLSD: desired Event must have a key "foo"
// containing string "bar"
LLEventMatching filter(driver, LLSD().insert("foo", "bar"));
LLSD pattern;
pattern.insert("foo", "bar");
LLEventMatching filter(driver, pattern);
listener1.reset(0);
LLTempBoundListener temp2(
listener1.listenTo(filter));
@ -285,6 +366,47 @@ namespace tut
mainloop.post(17);
check_listener("no timeout 3", listener0, LLSD(0));
}
template<> template<>
void filter_object::test<5>()
{
set_test_name("LLEventThrottle");
TestEventThrottle throttle(3);
Concat cat;
throttle.listen("concat", boost::ref(cat));
// (sequence taken from LLEventThrottleBase Doxygen comments)
// 1: post(): event immediately passed to listeners, next no sooner than 4
throttle.advance(1);
throttle.post("1");
ensure_equals("1", cat.result, "1"); // delivered immediately
// 2: post(): deferred: waiting for 3 seconds to elapse
throttle.advance(1);
throttle.post("2");
ensure_equals("2", cat.result, "1"); // "2" not yet delivered
// 3: post(): deferred
throttle.advance(1);
throttle.post("3");
ensure_equals("3", cat.result, "1"); // "3" not yet delivered
// 4: no post() call, but event delivered to listeners; next no sooner than 7
throttle.advance(1);
ensure_equals("4", cat.result, "13"); // "3" delivered
// 6: post(): deferred
throttle.advance(2);
throttle.post("6");
ensure_equals("6", cat.result, "13"); // "6" not yet delivered
// 7: no post() call, but event delivered; next no sooner than 10
throttle.advance(1);
ensure_equals("7", cat.result, "136"); // "6" delivered
// 12: post(): immediately passed to listeners, next no sooner than 15
throttle.advance(5);
throttle.post(";12");
ensure_equals("12", cat.result, "136;12"); // "12" delivered
// 17: post(): immediately passed to listeners, next no sooner than 20
throttle.advance(5);
throttle.post(";17");
ensure_equals("17", cat.result, "136;12;17"); // "17" delivered
}
} // namespace tut
/*****************************************************************************