drain UDP socket to avoid dropped packets (#3565)

drain UDP socket in idleNetwork() to avoid dropped packets
master
Andrew Meadows 2025-02-18 11:38:52 -08:00 committed by GitHub
parent 74d2ed918d
commit 6d0b0a77ee
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 369 additions and 414 deletions

View File

@ -799,7 +799,6 @@ void LLAvatarAppearance::buildCharacter()
bool status = loadAvatar();
stop_glerror();
// gPrintMessagesThisFrame = true;
LL_DEBUGS() << "Avatar load took " << timer.getElapsedTimeF32() << " seconds." << LL_ENDL;
if (!status)

View File

@ -32,8 +32,6 @@
#include "lltimer.h"
#include "llhost.h"
///////////////////////////////////////////////////////////
LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32 size) : mHost(host)
{
mSize = 0;
@ -41,7 +39,7 @@ LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32
if (size > NET_BUFFER_SIZE)
{
LL_ERRS() << "Sending packet > " << NET_BUFFER_SIZE << " of size " << size << LL_ENDL;
LL_ERRS() << "Constructing packet with size=" << size << " > " << NET_BUFFER_SIZE << LL_ENDL;
}
else
{
@ -51,7 +49,6 @@ LLPacketBuffer::LLPacketBuffer(const LLHost &host, const char *datap, const S32
mSize = size;
}
}
}
LLPacketBuffer::LLPacketBuffer (S32 hSocket)
@ -59,18 +56,29 @@ LLPacketBuffer::LLPacketBuffer (S32 hSocket)
init(hSocket);
}
///////////////////////////////////////////////////////////
LLPacketBuffer::~LLPacketBuffer ()
{
}
///////////////////////////////////////////////////////////
void LLPacketBuffer::init (S32 hSocket)
void LLPacketBuffer::init(S32 hSocket)
{
mSize = receive_packet(hSocket, mData);
mHost = ::get_sender();
mReceivingIF = ::get_receiving_interface();
}
void LLPacketBuffer::init(const char* buffer, S32 data_size, const LLHost& host)
{
if (data_size > NET_BUFFER_SIZE)
{
LL_ERRS() << "Initializing packet with size=" << data_size << " > " << NET_BUFFER_SIZE << LL_ENDL;
}
else
{
memcpy(mData, buffer, data_size);
mSize = data_size;
mHost = host;
mReceivingIF = ::get_receiving_interface();
}
}

View File

@ -35,20 +35,22 @@ class LLPacketBuffer
{
public:
LLPacketBuffer(const LLHost &host, const char *datap, const S32 size);
LLPacketBuffer(S32 hSocket); // receive a packet
LLPacketBuffer(S32 hSocket); // receive a packet
~LLPacketBuffer();
S32 getSize() const { return mSize; }
const char *getData() const { return mData; }
LLHost getHost() const { return mHost; }
LLHost getReceivingInterface() const { return mReceivingIF; }
void init(S32 hSocket);
void init(const char* buffer, S32 data_size, const LLHost& host);
protected:
char mData[NET_BUFFER_SIZE]; // packet data /* Flawfinder : ignore */
S32 mSize; // size of buffer in bytes
LLHost mHost; // source/dest IP and port
LLHost mReceivingIF; // source/dest IP and port
char mData[NET_BUFFER_SIZE]; // packet data /* Flawfinder : ignore */
S32 mSize; // size of buffer in bytes
LLHost mHost; // source/dest IP and port
LLHost mReceivingIF; // source/dest IP and port
};
#endif

View File

@ -1,6 +1,6 @@
/**
* @file llpacketring.cpp
* @brief implementation of LLPacketRing class for a packet.
* @brief implementation of LLPacketRing class.
*
* $LicenseInfo:firstyear=2001&license=viewerlgpl$
* Second Life Viewer Source Code
@ -43,313 +43,44 @@
#include "message.h"
#include "u64.h"
///////////////////////////////////////////////////////////
LLPacketRing::LLPacketRing () :
mUseInThrottle(false),
mUseOutThrottle(false),
mInThrottle(256000.f),
mOutThrottle(64000.f),
mActualBitsIn(0),
mActualBitsOut(0),
mMaxBufferLength(64000),
mInBufferLength(0),
mOutBufferLength(0),
mDropPercentage(0.0f),
mPacketsToDrop(0x0)
constexpr S16 MAX_BUFFER_RING_SIZE = 1024;
constexpr S16 DEFAULT_BUFFER_RING_SIZE = 256;
LLPacketRing::LLPacketRing ()
: mPacketRing(DEFAULT_BUFFER_RING_SIZE, nullptr)
{
LLHost invalid_host;
for (size_t i = 0; i < mPacketRing.size(); ++i)
{
mPacketRing[i] = new LLPacketBuffer(invalid_host, nullptr, 0);
}
}
///////////////////////////////////////////////////////////
LLPacketRing::~LLPacketRing ()
{
cleanup();
}
///////////////////////////////////////////////////////////
void LLPacketRing::cleanup ()
{
LLPacketBuffer *packetp;
while (!mReceiveQueue.empty())
for (auto packet : mPacketRing)
{
packetp = mReceiveQueue.front();
delete packetp;
mReceiveQueue.pop();
}
while (!mSendQueue.empty())
{
packetp = mSendQueue.front();
delete packetp;
mSendQueue.pop();
delete packet;
}
mPacketRing.clear();
mNumBufferedPackets = 0;
mNumBufferedBytes = 0;
mHeadIndex = 0;
}
///////////////////////////////////////////////////////////
void LLPacketRing::dropPackets (U32 num_to_drop)
{
mPacketsToDrop += num_to_drop;
}
///////////////////////////////////////////////////////////
void LLPacketRing::setDropPercentage (F32 percent_to_drop)
{
mDropPercentage = percent_to_drop;
}
void LLPacketRing::setUseInThrottle(const bool use_throttle)
{
mUseInThrottle = use_throttle;
}
void LLPacketRing::setUseOutThrottle(const bool use_throttle)
{
mUseOutThrottle = use_throttle;
}
void LLPacketRing::setInBandwidth(const F32 bps)
{
mInThrottle.setRate(bps);
}
void LLPacketRing::setOutBandwidth(const F32 bps)
{
mOutThrottle.setRate(bps);
}
///////////////////////////////////////////////////////////
S32 LLPacketRing::receiveFromRing (S32 socket, char *datap)
{
if (mInThrottle.checkOverflow(0))
{
// We don't have enough bandwidth, don't give them a packet.
return 0;
}
LLPacketBuffer *packetp = NULL;
if (mReceiveQueue.empty())
{
// No packets on the queue, don't give them any.
return 0;
}
S32 packet_size = 0;
packetp = mReceiveQueue.front();
mReceiveQueue.pop();
packet_size = packetp->getSize();
if (packetp->getData() != NULL)
{
memcpy(datap, packetp->getData(), packet_size); /*Flawfinder: ignore*/
}
// need to set sender IP/port!!
mLastSender = packetp->getHost();
mLastReceivingIF = packetp->getReceivingInterface();
delete packetp;
this->mInBufferLength -= packet_size;
// Adjust the throttle
mInThrottle.throttleOverflow(packet_size * 8.f);
return packet_size;
}
///////////////////////////////////////////////////////////
S32 LLPacketRing::receivePacket (S32 socket, char *datap)
{
S32 packet_size = 0;
// If using the throttle, simulate a limited size input buffer.
if (mUseInThrottle)
{
bool done = false;
// push any current net packet (if any) onto delay ring
while (!done)
{
LLPacketBuffer *packetp;
packetp = new LLPacketBuffer(socket);
if (packetp->getSize())
{
mActualBitsIn += packetp->getSize() * 8;
// Fake packet loss
if (mDropPercentage && (ll_frand(100.f) < mDropPercentage))
{
mPacketsToDrop++;
}
if (mPacketsToDrop)
{
delete packetp;
packetp = NULL;
packet_size = 0;
mPacketsToDrop--;
}
}
// If we faked packet loss, then we don't have a packet
// to use for buffer overflow testing
if (packetp)
{
if (mInBufferLength + packetp->getSize() > mMaxBufferLength)
{
// Toss it.
LL_WARNS() << "Throwing away packet, overflowing buffer" << LL_ENDL;
delete packetp;
packetp = NULL;
}
else if (packetp->getSize())
{
mReceiveQueue.push(packetp);
mInBufferLength += packetp->getSize();
}
else
{
delete packetp;
packetp = NULL;
done = true;
}
}
else
{
// No packetp, keep going? - no packetp == faked packet loss
}
}
// Now, grab data off of the receive queue according to our
// throttled bandwidth settings.
packet_size = receiveFromRing(socket, datap);
}
else
{
// no delay, pull straight from net
if (LLProxy::isSOCKSProxyEnabled())
{
U8 buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE];
packet_size = receive_packet(socket, static_cast<char*>(static_cast<void*>(buffer)));
if (packet_size > SOCKS_HEADER_SIZE)
{
// *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6)
memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size - SOCKS_HEADER_SIZE);
proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer));
mLastSender.setAddress(header->addr);
mLastSender.setPort(ntohs(header->port));
packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size
}
else
{
packet_size = 0;
}
}
else
{
packet_size = receive_packet(socket, datap);
mLastSender = ::get_sender();
}
mLastReceivingIF = ::get_receiving_interface();
if (packet_size) // did we actually get a packet?
{
if (mDropPercentage && (ll_frand(100.f) < mDropPercentage))
{
mPacketsToDrop++;
}
if (mPacketsToDrop)
{
packet_size = 0;
mPacketsToDrop--;
}
}
}
return packet_size;
bool drop = computeDrop();
return (mNumBufferedPackets > 0) ?
receiveOrDropBufferedPacket(datap, drop) :
receiveOrDropPacket(socket, datap, drop);
}
bool LLPacketRing::sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host)
bool send_packet_helper(int socket, const char * datap, S32 data_size, LLHost host)
{
bool status = true;
if (!mUseOutThrottle)
{
return sendPacketImpl(h_socket, send_buffer, buf_size, host );
}
else
{
mActualBitsOut += buf_size * 8;
LLPacketBuffer *packetp = NULL;
// See if we've got enough throttle to send a packet.
while (!mOutThrottle.checkOverflow(0.f))
{
// While we have enough bandwidth, send a packet from the queue or the current packet
S32 packet_size = 0;
if (!mSendQueue.empty())
{
// Send a packet off of the queue
LLPacketBuffer *packetp = mSendQueue.front();
mSendQueue.pop();
mOutBufferLength -= packetp->getSize();
packet_size = packetp->getSize();
status = sendPacketImpl(h_socket, packetp->getData(), packet_size, packetp->getHost());
delete packetp;
// Update the throttle
mOutThrottle.throttleOverflow(packet_size * 8.f);
}
else
{
// If the queue's empty, we can just send this packet right away.
status = sendPacketImpl(h_socket, send_buffer, buf_size, host );
packet_size = buf_size;
// Update the throttle
mOutThrottle.throttleOverflow(packet_size * 8.f);
// This was the packet we're sending now, there are no other packets
// that we need to send
return status;
}
}
// We haven't sent the incoming packet, add it to the queue
if (mOutBufferLength + buf_size > mMaxBufferLength)
{
// Nuke this packet, we overflowed the buffer.
// Toss it.
LL_WARNS() << "Throwing away outbound packet, overflowing buffer" << LL_ENDL;
}
else
{
static LLTimer queue_timer;
if ((mOutBufferLength > 4192) && queue_timer.getElapsedTimeF32() > 1.f)
{
// Add it to the queue
LL_INFOS() << "Outbound packet queue " << mOutBufferLength << " bytes" << LL_ENDL;
queue_timer.reset();
}
packetp = new LLPacketBuffer(host, send_buffer, buf_size);
mOutBufferLength += packetp->getSize();
mSendQueue.push(packetp);
}
}
return status;
}
bool LLPacketRing::sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host)
{
if (!LLProxy::isSOCKSProxyEnabled())
{
return send_packet(h_socket, send_buffer, buf_size, host.getAddress(), host.getPort());
return send_packet(socket, datap, data_size, host.getAddress(), host.getPort());
}
char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE];
@ -361,11 +92,252 @@ bool LLPacketRing::sendPacketImpl(int h_socket, const char * send_buffer, S32 bu
socks_header->atype = ADDRESS_IPV4;
socks_header->frag = 0;
memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, send_buffer, buf_size);
memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, datap, data_size);
return send_packet( h_socket,
return send_packet( socket,
headered_send_buffer,
buf_size + SOCKS_HEADER_SIZE,
data_size + SOCKS_HEADER_SIZE,
LLProxy::getInstance()->getUDPProxy().getAddress(),
LLProxy::getInstance()->getUDPProxy().getPort());
}
bool LLPacketRing::sendPacket(int socket, const char * datap, S32 data_size, LLHost host)
{
mActualBytesOut += data_size;
return send_packet_helper(socket, datap, data_size, host);
}
void LLPacketRing::dropPackets (U32 num_to_drop)
{
mPacketsToDrop += num_to_drop;
}
void LLPacketRing::setDropPercentage (F32 percent_to_drop)
{
mDropPercentage = percent_to_drop;
}
bool LLPacketRing::computeDrop()
{
bool drop= (mDropPercentage > 0.0f && (ll_frand(100.f) < mDropPercentage));
if (drop)
{
++mPacketsToDrop;
}
if (mPacketsToDrop > 0)
{
--mPacketsToDrop;
drop = true;
}
return drop;
}
S32 LLPacketRing::receiveOrDropPacket(S32 socket, char *datap, bool drop)
{
S32 packet_size = 0;
// pull straight from socket
if (LLProxy::isSOCKSProxyEnabled())
{
char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */
packet_size = receive_packet(socket, buffer);
if (packet_size > 0)
{
mActualBytesIn += packet_size;
}
if (packet_size > SOCKS_HEADER_SIZE)
{
if (drop)
{
packet_size = 0;
}
else
{
// *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6)
packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size
memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size);
proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer));
mLastSender.setAddress(header->addr);
mLastSender.setPort(ntohs(header->port));
mLastReceivingIF = ::get_receiving_interface();
}
}
else
{
packet_size = 0;
}
}
else
{
packet_size = receive_packet(socket, datap);
if (packet_size > 0)
{
mActualBytesIn += packet_size;
if (drop)
{
packet_size = 0;
}
else
{
mLastSender = ::get_sender();
mLastReceivingIF = ::get_receiving_interface();
}
}
}
return packet_size;
}
S32 LLPacketRing::receiveOrDropBufferedPacket(char *datap, bool drop)
{
assert(mNumBufferedPackets > 0);
S32 packet_size = 0;
S16 ring_size = (S16)(mPacketRing.size());
S16 packet_index = (mHeadIndex + ring_size - mNumBufferedPackets) % ring_size;
LLPacketBuffer* packet = mPacketRing[packet_index];
packet_size = packet->getSize();
mLastSender = packet->getHost();
mLastReceivingIF = packet->getReceivingInterface();
--mNumBufferedPackets;
mNumBufferedBytes -= packet_size;
if (mNumBufferedPackets == 0)
{
assert(mNumBufferedBytes == 0);
}
if (!drop)
{
assert(packet_size > 0);
memcpy(datap, packet->getData(), packet_size);
}
else
{
packet_size = 0;
}
return packet_size;
}
S32 LLPacketRing::bufferInboundPacket(S32 socket)
{
if (mNumBufferedPackets == mPacketRing.size() && mNumBufferedPackets < MAX_BUFFER_RING_SIZE)
{
expandRing();
}
LLPacketBuffer* packet = mPacketRing[mHeadIndex];
S32 old_packet_size = packet->getSize();
S32 packet_size = 0;
if (LLProxy::isSOCKSProxyEnabled())
{
char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */
packet_size = receive_packet(socket, buffer);
if (packet_size > 0)
{
mActualBytesIn += packet_size;
if (packet_size > SOCKS_HEADER_SIZE)
{
// *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6)
proxywrap_t * header = static_cast<proxywrap_t*>(static_cast<void*>(buffer));
LLHost sender;
sender.setAddress(header->addr);
sender.setPort(ntohs(header->port));
packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size
packet->init(buffer + SOCKS_HEADER_SIZE, packet_size, sender);
mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size());
if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE)
{
++mNumBufferedPackets;
mNumBufferedBytes += packet_size;
}
else
{
// we overwrote an older packet
mNumBufferedBytes += packet_size - old_packet_size;
}
}
else
{
packet_size = 0;
}
}
}
else
{
packet->init(socket);
packet_size = packet->getSize();
if (packet_size > 0)
{
mActualBytesIn += packet_size;
mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size());
if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE)
{
++mNumBufferedPackets;
mNumBufferedBytes += packet_size;
}
else
{
// we overwrote an older packet
mNumBufferedBytes += packet_size - old_packet_size;
}
}
}
return packet_size;
}
S32 LLPacketRing::drainSocket(S32 socket)
{
// drain into buffer
S32 packet_size = 1;
S32 num_loops = 0;
S32 old_num_packets = mNumBufferedPackets;
while (packet_size > 0)
{
packet_size = bufferInboundPacket(socket);
++num_loops;
}
S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets;
if (num_dropped_packets > 0)
{
LL_WARNS("Messaging") << "dropped " << num_dropped_packets << " UDP packets" << LL_ENDL;
}
return (S32)(mNumBufferedPackets);
}
bool LLPacketRing::expandRing()
{
// compute larger size
constexpr S16 BUFFER_RING_EXPANSION = 256;
S16 old_size = (S16)(mPacketRing.size());
S16 new_size = llmin(old_size + BUFFER_RING_EXPANSION, MAX_BUFFER_RING_SIZE);
if (new_size == old_size)
{
// mPacketRing is already maxed out
return false;
}
// make a larger ring and copy packet pointers
std::vector<LLPacketBuffer*> new_ring(new_size, nullptr);
for (S16 i = 0; i < old_size; ++i)
{
S16 j = (mHeadIndex + i) % old_size;
new_ring[i] = mPacketRing[j];
}
// allocate new packets for the remainder of new_ring
LLHost invalid_host;
for (S16 i = old_size; i < new_size; ++i)
{
new_ring[i] = new LLPacketBuffer(invalid_host, nullptr, 0);
}
// swap the rings and reset mHeadIndex
mPacketRing.swap(new_ring);
mHeadIndex = mNumBufferedPackets;
return true;
}

View File

@ -25,16 +25,14 @@
* $/LicenseInfo$
*/
#ifndef LL_LLPACKETRING_H
#define LL_LLPACKETRING_H
#pragma once
#include <queue>
#include <vector>
#include "llhost.h"
#include "llpacketbuffer.h"
#include "llproxy.h"
#include "llthrottle.h"
#include "net.h"
class LLPacketRing
{
@ -42,60 +40,65 @@ public:
LLPacketRing();
~LLPacketRing();
void cleanup();
// receive one packet: either buffered or from the socket
S32 receivePacket (S32 socket, char *datap);
// send one packet
bool sendPacket(int h_socket, const char * send_buffer, S32 buf_size, LLHost host);
// drains packets from socket and returns final mNumBufferedPackets
S32 drainSocket(S32 socket);
void dropPackets(U32);
void setDropPercentage (F32 percent_to_drop);
void setUseInThrottle(const bool use_throttle);
void setUseOutThrottle(const bool use_throttle);
void setInBandwidth(const F32 bps);
void setOutBandwidth(const F32 bps);
S32 receivePacket (S32 socket, char *datap);
S32 receiveFromRing (S32 socket, char *datap);
bool sendPacket(int h_socket, char * send_buffer, S32 buf_size, LLHost host);
inline LLHost getLastSender() const;
inline LLHost getLastReceivingInterface() const;
inline LLHost getLastSender();
inline LLHost getLastReceivingInterface();
S32 getActualInBytes() const { return mActualBytesIn; }
S32 getActualOutBytes() const { return mActualBytesOut; }
S32 getAndResetActualInBits() { S32 bits = mActualBytesIn * 8; mActualBytesIn = 0; return bits;}
S32 getAndResetActualOutBits() { S32 bits = mActualBytesOut * 8; mActualBytesOut = 0; return bits;}
S32 getAndResetActualInBits() { S32 bits = mActualBitsIn; mActualBitsIn = 0; return bits;}
S32 getAndResetActualOutBits() { S32 bits = mActualBitsOut; mActualBitsOut = 0; return bits;}
S32 getNumBufferedPackets() const { return (S32)(mNumBufferedPackets); }
S32 getNumBufferedBytes() const { return mNumBufferedBytes; }
protected:
bool mUseInThrottle;
bool mUseOutThrottle;
// returns 'true' if we should intentionally drop a packet
bool computeDrop();
// For simulating a lower-bandwidth connection - BPS
LLThrottle mInThrottle;
LLThrottle mOutThrottle;
// returns packet_size of received packet, zero or less if no packet found
S32 receiveOrDropPacket(S32 socket, char *datap, bool drop);
S32 receiveOrDropBufferedPacket(char *datap, bool drop);
S32 mActualBitsIn;
S32 mActualBitsOut;
S32 mMaxBufferLength; // How much data can we queue up before dropping data.
S32 mInBufferLength; // Current incoming buffer length
S32 mOutBufferLength; // Current outgoing buffer length
// returns packet_size of packet buffered
S32 bufferInboundPacket(S32 socket);
F32 mDropPercentage; // % of packets to drop
U32 mPacketsToDrop; // drop next n packets
// returns 'true' if ring was expanded
bool expandRing();
std::queue<LLPacketBuffer *> mReceiveQueue;
std::queue<LLPacketBuffer *> mSendQueue;
protected:
std::vector<LLPacketBuffer*> mPacketRing;
S16 mHeadIndex { 0 };
S16 mNumBufferedPackets { 0 };
S32 mNumBufferedBytes { 0 };
S32 mActualBytesIn { 0 };
S32 mActualBytesOut { 0 };
F32 mDropPercentage { 0.0f }; // % of inbound packets to drop
U32 mPacketsToDrop { 0 }; // drop next inbound n packets
// These are the sender and receiving_interface for the last packet delivered by receivePacket()
LLHost mLastSender;
LLHost mLastReceivingIF;
private:
bool sendPacketImpl(int h_socket, const char * send_buffer, S32 buf_size, LLHost host);
};
inline LLHost LLPacketRing::getLastSender()
inline LLHost LLPacketRing::getLastSender() const
{
return mLastSender;
}
inline LLHost LLPacketRing::getLastReceivingInterface()
inline LLHost LLPacketRing::getLastReceivingInterface() const
{
return mLastReceivingIF;
}
#endif

View File

@ -656,8 +656,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count )
// UseCircuitCode is allowed in even from an invalid circuit, so that
// we can toss circuits around.
if(
valid_packet &&
else if (
!cdp &&
(mTemplateMessageReader->getMessageName() !=
_PREHASH_UseCircuitCode))
@ -667,8 +666,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count )
valid_packet = false;
}
if(
valid_packet &&
if ( valid_packet &&
cdp &&
!cdp->getTrusted() &&
mTemplateMessageReader->isTrusted())
@ -680,7 +678,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count )
valid_packet = false;
}
if( valid_packet )
if ( valid_packet )
{
logValidMsg(cdp, host, recv_reliable, recv_resent, acks>0 );
valid_packet = mTemplateMessageReader->readMessage(buffer, host);
@ -821,6 +819,11 @@ void LLMessageSystem::processAcks(LockMessageChecker&, F32 collect_time)
}
}
S32 LLMessageSystem::drainUdpSocket()
{
return mPacketRing.drainSocket(mSocket);
}
void LLMessageSystem::copyMessageReceivedToSend()
{
// NOTE: babbage: switch builder to match reader to avoid

View File

@ -417,6 +417,9 @@ public:
bool checkMessages(LockMessageChecker&, S64 frame_count = 0 );
void processAcks(LockMessageChecker&, F32 collect_time = 0.f);
// returns total number of buffered packets after the drain
S32 drainUdpSocket();
bool isMessageFast(const char *msg);
bool isMessage(const char *msg)
{

View File

@ -76,14 +76,8 @@ static U32 gsnReceivingIFAddr = INVALID_HOST_IP_ADDRESS; // Address to which dat
const char* LOOPBACK_ADDRESS_STRING = "127.0.0.1";
const char* BROADCAST_ADDRESS_STRING = "255.255.255.255";
#if LL_DARWIN
// macOS returns an error when trying to set these to 400000. Smaller values succeed.
const int SEND_BUFFER_SIZE = 200000;
const int RECEIVE_BUFFER_SIZE = 200000;
#else // LL_DARWIN
const int SEND_BUFFER_SIZE = 400000;
const int RECEIVE_BUFFER_SIZE = 400000;
#endif // LL_DARWIN
const int SEND_BUFFER_SIZE = 200000;
const int RECEIVE_BUFFER_SIZE = 800000;
// universal functions (cross-platform)

View File

@ -350,8 +350,6 @@ LLVector3 gRelativeWindVec(0.0, 0.0, 0.0);
U32 gPacketsIn = 0;
bool gPrintMessagesThisFrame = false;
bool gRandomizeFramerate = false;
bool gPeriodicSlowFrame = false;
@ -1495,9 +1493,9 @@ bool LLAppViewer::doFrame()
{
LL_PROFILE_ZONE_NAMED_CATEGORY_APP("df pauseMainloopTimeout");
pingMainloopTimeout("Main:Sleep");
pingMainloopTimeout("Main:Sleep");
pauseMainloopTimeout();
pauseMainloopTimeout();
}
// Sleep and run background threads
@ -5217,12 +5215,9 @@ void LLAppViewer::idleNameCache()
// Handle messages, and all message related stuff
//
#define TIME_THROTTLE_MESSAGES
#ifdef TIME_THROTTLE_MESSAGES
#define CHECK_MESSAGES_DEFAULT_MAX_TIME .020f // 50 ms = 50 fps (just for messages!)
constexpr F32 CHECK_MESSAGES_DEFAULT_MAX_TIME = 0.020f; // 50 ms = 50 fps (just for messages!)
static F32 CheckMessagesMaxTime = CHECK_MESSAGES_DEFAULT_MAX_TIME;
#endif
static LLTrace::BlockTimerStatHandle FTM_IDLE_NETWORK("Idle Network");
static LLTrace::BlockTimerStatHandle FTM_MESSAGE_ACKS("Message Acks");
@ -5249,6 +5244,7 @@ void LLAppViewer::idleNetwork()
F32 total_time = 0.0f;
{
bool needs_drain = false;
LockMessageChecker lmc(gMessageSystem);
while (lmc.checkAllMessages(frame_count, gServicePump))
{
@ -5265,50 +5261,41 @@ void LLAppViewer::idleNetwork()
if (total_decoded > MESSAGE_MAX_PER_FRAME)
{
needs_drain = true;
break;
}
#ifdef TIME_THROTTLE_MESSAGES
// Prevent slow packets from completely destroying the frame rate.
// This usually happens due to clumps of avatars taking huge amount
// of network processing time (which needs to be fixed, but this is
// a good limit anyway).
total_time = check_message_timer.getElapsedTimeF32();
if (total_time >= CheckMessagesMaxTime)
{
needs_drain = true;
break;
#endif
}
}
if (needs_drain || gMessageSystem->mPacketRing.getNumBufferedPackets() > 0)
{
// Rather than allow packets to silently backup on the socket
// we drain them into our own buffer so we know how many exist.
S32 num_buffered_packets = gMessageSystem->drainUdpSocket();
if (num_buffered_packets > 0)
{
// Increase CheckMessagesMaxTime so that we will eventually catch up
CheckMessagesMaxTime *= 1.035f; // 3.5% ~= 2x in 20 frames, ~8x in 60 frames
}
}
else
{
// Reset CheckMessagesMaxTime to default value
CheckMessagesMaxTime = CHECK_MESSAGES_DEFAULT_MAX_TIME;
}
// Handle per-frame message system processing.
lmc.processAcks(gSavedSettings.getF32("AckCollectTime"));
}
#ifdef TIME_THROTTLE_MESSAGES
if (total_time >= CheckMessagesMaxTime)
{
// Increase CheckMessagesMaxTime so that we will eventually catch up
CheckMessagesMaxTime *= 1.035f; // 3.5% ~= x2 in 20 frames, ~8x in 60 frames
}
else
{
// Reset CheckMessagesMaxTime to default value
CheckMessagesMaxTime = CHECK_MESSAGES_DEFAULT_MAX_TIME;
}
#endif
// Decode enqueued messages...
S32 remaining_possible_decodes = MESSAGE_MAX_PER_FRAME - total_decoded;
if( remaining_possible_decodes <= 0 )
{
LL_INFOS() << "Maxed out number of messages per frame at " << MESSAGE_MAX_PER_FRAME << LL_ENDL;
}
if (gPrintMessagesThisFrame)
{
LL_INFOS() << "Decoded " << total_decoded << " msgs this frame!" << LL_ENDL;
gPrintMessagesThisFrame = false;
}
}
add(LLStatViewer::NUM_NEW_OBJECTS, gObjectList.mNumNewObjects);

View File

@ -400,7 +400,6 @@ extern std::string gLastVersionChannel;
extern LLVector3 gWindVec;
extern LLVector3 gRelativeWindVec;
extern U32 gPacketsIn;
extern bool gPrintMessagesThisFrame;
extern bool gRandomizeFramerate;
extern bool gPeriodicSlowFrame;

View File

@ -623,21 +623,6 @@ bool idle_startup()
F32 dropPercent = gSavedSettings.getF32("PacketDropPercentage");
msg->mPacketRing.setDropPercentage(dropPercent);
F32 inBandwidth = gSavedSettings.getF32("InBandwidth");
F32 outBandwidth = gSavedSettings.getF32("OutBandwidth");
if (inBandwidth != 0.f)
{
LL_DEBUGS("AppInit") << "Setting packetring incoming bandwidth to " << inBandwidth << LL_ENDL;
msg->mPacketRing.setUseInThrottle(true);
msg->mPacketRing.setInBandwidth(inBandwidth);
}
if (outBandwidth != 0.f)
{
LL_DEBUGS("AppInit") << "Setting packetring outgoing bandwidth to " << outBandwidth << LL_ENDL;
msg->mPacketRing.setUseOutThrottle(true);
msg->mPacketRing.setOutBandwidth(outBandwidth);
}
}
LL_INFOS("AppInit") << "Message System Initialized." << LL_ENDL;