241 lines
8.1 KiB
C++
241 lines
8.1 KiB
C++
/**
|
|
* @file llstreamqueue.h
|
|
* @author Nat Goodspeed
|
|
* @date 2012-01-04
|
|
* @brief Definition of LLStreamQueue
|
|
*
|
|
* $LicenseInfo:firstyear=2012&license=viewerlgpl$
|
|
* Copyright (c) 2012, Linden Research, Inc.
|
|
* $/LicenseInfo$
|
|
*/
|
|
|
|
#if ! defined(LL_LLSTREAMQUEUE_H)
|
|
#define LL_LLSTREAMQUEUE_H
|
|
|
|
#include <string>
|
|
#include <list>
|
|
#include <iosfwd> // std::streamsize
|
|
#include <boost/iostreams/categories.hpp>
|
|
|
|
/**
|
|
* This class is a growable buffer between a producer and consumer. It serves
|
|
* as a queue usable with Boost.Iostreams -- hence, a "stream queue."
|
|
*
|
|
* This is especially useful for buffering nonblocking I/O. For instance, we
|
|
* want application logic to be able to serialize LLSD to a std::ostream. We
|
|
* may write more data than the destination pipe can handle all at once, but
|
|
* it's imperative NOT to block the application-level serialization call. So
|
|
* we buffer it instead. Successive frames can try nonblocking writes to the
|
|
* destination pipe until all buffered data has been sent.
|
|
*
|
|
* Similarly, we want application logic be able to deserialize LLSD from a
|
|
* std::istream. Again, we must not block that deserialize call waiting for
|
|
* more data to arrive from the input pipe! Instead we build up a buffer over
|
|
* a number of frames, using successive nonblocking reads, until we have
|
|
* "enough" data to be able to present it through a std::istream.
|
|
*
|
|
* @note The use cases for this class overlap somewhat with those for the
|
|
* LLIOPipe/LLPumpIO hierarchies, and indeed we considered using those. This
|
|
* class has two virtues over the older machinery:
|
|
*
|
|
* # It's vastly simpler -- way fewer concepts. It's not clear to me whether
|
|
* there were ever LLIOPipe/etc. use cases that demanded all the fanciness
|
|
* rolled in, or whether they were simply overdesigned. In any case, no
|
|
* remaining Lindens will admit to familiarity with those classes -- and
|
|
* they're sufficiently obtuse that it would take considerable learning
|
|
* curve to figure out how to use them properly. The bottom line is that
|
|
* current management is not keen on any more engineers climbing that curve.
|
|
* # This class is designed around available components such as std::string,
|
|
* std::list, Boost.Iostreams. There's less proprietary code.
|
|
*/
|
|
template <typename Ch>
|
|
class LLGenericStreamQueue
|
|
{
|
|
public:
|
|
LLGenericStreamQueue():
|
|
mSize(0),
|
|
mClosed(false)
|
|
{}
|
|
|
|
/**
|
|
* Boost.Iostreams Source Device facade for use with other Boost.Iostreams
|
|
* functionality. LLGenericStreamQueue doesn't quite fit any of the Boost
|
|
* 1.48 Iostreams concepts; instead it behaves as both a Sink and a
|
|
* Source. This is its Source facade.
|
|
*/
|
|
struct Source
|
|
{
|
|
typedef Ch char_type;
|
|
typedef boost::iostreams::source_tag category;
|
|
|
|
/// Bind the underlying LLGenericStreamQueue
|
|
Source(LLGenericStreamQueue& sq):
|
|
mStreamQueue(sq)
|
|
{}
|
|
|
|
// Read up to n characters from the underlying data source into the
|
|
// buffer s, returning the number of characters read; return -1 to
|
|
// indicate EOF
|
|
std::streamsize read(Ch* s, std::streamsize n)
|
|
{
|
|
return mStreamQueue.read(s, n);
|
|
}
|
|
|
|
LLGenericStreamQueue& mStreamQueue;
|
|
};
|
|
|
|
/**
|
|
* Boost.Iostreams Sink Device facade for use with other Boost.Iostreams
|
|
* functionality. LLGenericStreamQueue doesn't quite fit any of the Boost
|
|
* 1.48 Iostreams concepts; instead it behaves as both a Sink and a
|
|
* Source. This is its Sink facade.
|
|
*/
|
|
struct Sink
|
|
{
|
|
typedef Ch char_type;
|
|
typedef boost::iostreams::sink_tag category;
|
|
|
|
/// Bind the underlying LLGenericStreamQueue
|
|
Sink(LLGenericStreamQueue& sq):
|
|
mStreamQueue(sq)
|
|
{}
|
|
|
|
/// Write up to n characters from the buffer s to the output sequence,
|
|
/// returning the number of characters written
|
|
std::streamsize write(const Ch* s, std::streamsize n)
|
|
{
|
|
return mStreamQueue.write(s, n);
|
|
}
|
|
|
|
/// Send EOF to consumer
|
|
void close()
|
|
{
|
|
mStreamQueue.close();
|
|
}
|
|
|
|
LLGenericStreamQueue& mStreamQueue;
|
|
};
|
|
|
|
/// Present Boost.Iostreams Source facade
|
|
Source asSource() { return Source(*this); }
|
|
/// Present Boost.Iostreams Sink facade
|
|
Sink asSink() { return Sink(*this); }
|
|
|
|
/// append data to buffer
|
|
std::streamsize write(const Ch* s, std::streamsize n)
|
|
{
|
|
// Unclear how often we might be asked to write 0 bytes -- perhaps a
|
|
// naive caller responding to an unready nonblocking read. But if we
|
|
// do get such a call, don't add a completely empty BufferList entry.
|
|
if (n == 0)
|
|
return n;
|
|
// We could implement this using a single std::string object, a la
|
|
// ostringstream. But the trouble with appending to a string is that
|
|
// you might have to recopy all previous contents to grow its size. If
|
|
// we want this to scale to large data volumes, better to allocate
|
|
// individual pieces.
|
|
mBuffer.push_back(string(s, n));
|
|
mSize += n;
|
|
return n;
|
|
}
|
|
|
|
/**
|
|
* Inform this LLGenericStreamQueue that no further data are forthcoming.
|
|
* For our purposes, close() is strictly a producer-side operation;
|
|
* there's little point in closing the consumer side.
|
|
*/
|
|
void close()
|
|
{
|
|
mClosed = true;
|
|
}
|
|
|
|
/// consume data from buffer
|
|
std::streamsize read(Ch* s, std::streamsize n)
|
|
{
|
|
// read() is actually a convenience method for peek() followed by
|
|
// skip().
|
|
std::streamsize got(peek(s, n));
|
|
// We can only skip() as many characters as we can peek(); ignore
|
|
// skip() return here.
|
|
skip(n);
|
|
return got;
|
|
}
|
|
|
|
/// Retrieve data from buffer without consuming. Like read(), return -1 on
|
|
/// EOF.
|
|
std::streamsize peek(Ch* s, std::streamsize n) const;
|
|
|
|
/// Consume data from buffer without retrieving. Unlike read() and peek(),
|
|
/// at EOF we simply skip 0 characters.
|
|
std::streamsize skip(std::streamsize n);
|
|
|
|
/// How many characters do we currently have buffered?
|
|
std::streamsize size() const
|
|
{
|
|
return mSize;
|
|
}
|
|
|
|
private:
|
|
typedef std::basic_string<Ch> string;
|
|
typedef std::list<string> BufferList;
|
|
BufferList mBuffer;
|
|
std::streamsize mSize;
|
|
bool mClosed;
|
|
};
|
|
|
|
template <typename Ch>
|
|
std::streamsize LLGenericStreamQueue<Ch>::peek(Ch* s, std::streamsize n) const
|
|
{
|
|
// Here we may have to build up 'n' characters from an arbitrary
|
|
// number of individual BufferList entries.
|
|
typename BufferList::const_iterator bli(mBuffer.begin()), blend(mBuffer.end());
|
|
// Indicate EOF if producer has closed the pipe AND we've exhausted
|
|
// all previously-buffered data.
|
|
if (mClosed && bli == blend)
|
|
{
|
|
return -1;
|
|
}
|
|
// Here either producer hasn't yet closed, or we haven't yet exhausted
|
|
// remaining data.
|
|
std::streamsize needed(n), got(0);
|
|
// Loop until either we run out of BufferList entries or we've
|
|
// completely satisfied the request.
|
|
for ( ; bli != blend && needed; ++bli)
|
|
{
|
|
std::streamsize chunk(std::min(needed, std::streamsize(bli->length())));
|
|
std::copy(bli->begin(), bli->begin() + chunk, s);
|
|
needed -= chunk;
|
|
s += chunk;
|
|
got += chunk;
|
|
}
|
|
return got;
|
|
}
|
|
|
|
template <typename Ch>
|
|
std::streamsize LLGenericStreamQueue<Ch>::skip(std::streamsize n)
|
|
{
|
|
typename BufferList::iterator bli(mBuffer.begin()), blend(mBuffer.end());
|
|
std::streamsize toskip(n), skipped(0);
|
|
while (bli != blend && (size_t)toskip >= bli->length())
|
|
{
|
|
std::streamsize chunk(bli->length());
|
|
typename BufferList::iterator zap(bli++);
|
|
mBuffer.erase(zap);
|
|
mSize -= chunk;
|
|
toskip -= chunk;
|
|
skipped += chunk;
|
|
}
|
|
if (bli != blend && toskip)
|
|
{
|
|
bli->erase(bli->begin(), bli->begin() + toskip);
|
|
mSize -= toskip;
|
|
skipped += toskip;
|
|
}
|
|
return skipped;
|
|
}
|
|
|
|
typedef LLGenericStreamQueue<char> LLStreamQueue;
|
|
typedef LLGenericStreamQueue<wchar_t> LLWStreamQueue;
|
|
|
|
#endif /* ! defined(LL_LLSTREAMQUEUE_H) */
|