diff --git a/indra/llmessage/llcircuit.cpp b/indra/llmessage/llcircuit.cpp index c2b1a2f069..eaee7b0112 100644 --- a/indra/llmessage/llcircuit.cpp +++ b/indra/llmessage/llcircuit.cpp @@ -346,8 +346,7 @@ S32 LLCircuitData::resendUnackedPackets(const F64Seconds now) packetp->mBuffer[0] |= LL_RESENT_FLAG; // tag packet id as being a resend - gMessageSystem->mPacketRing.sendPacket(packetp->mSocket, - (char *)packetp->mBuffer, packetp->mBufferLength, + gMessageSystem->sendPacketToSocket((char *)packetp->mBuffer, packetp->mBufferLength, packetp->mHost); mThrottles.throttleOverflow(TC_RESEND, packetp->mBufferLength * 8.f); @@ -973,7 +972,7 @@ bool LLCircuitData::updateWatchDogTimers(LLMessageSystem *msgsys) { // let's call this one a loss! mPacketsLost++; - gMessageSystem->mDroppedPackets++; + gMessageSystem->mLostPackets++; if(gMessageSystem->mVerboseLog) { std::ostringstream str; diff --git a/indra/llmessage/llpacketbuffer.h b/indra/llmessage/llpacketbuffer.h index ac4012d330..b0505e7afd 100644 --- a/indra/llmessage/llpacketbuffer.h +++ b/indra/llmessage/llpacketbuffer.h @@ -38,6 +38,9 @@ class LLPacketBuffer LLPacketBuffer(S32 hSocket); // receive a packet ~LLPacketBuffer(); + LLPacketBuffer(const LLPacketBuffer&) = default; + LLPacketBuffer& operator=(const LLPacketBuffer&) = default; + S32 getSize() const { return mSize; } const char *getData() const { return mData; } LLHost getHost() const { return mHost; } diff --git a/indra/llmessage/llpacketring.cpp b/indra/llmessage/llpacketring.cpp index b8284334ea..bade413e61 100644 --- a/indra/llmessage/llpacketring.cpp +++ b/indra/llmessage/llpacketring.cpp @@ -28,344 +28,121 @@ #include "llpacketring.h" -#if LL_WINDOWS - #include -#else - #include - #include -#endif - -// linden library includes #include "llerror.h" -#include "lltimer.h" -#include "llproxy.h" -#include "llrand.h" -#include "message.h" -#include "u64.h" -constexpr S16 MAX_BUFFER_RING_SIZE = 1024; +constexpr S16 MAX_BUFFER_RING_SIZE = 8192; + +// DANGER: don't adjust DEFAULT_BUFFER_RING_SIZE unless you know what +// you're doing. Its value affects the "buffer load rate" which is used +// to supply backpressure to an overloaded nework queue. constexpr S16 DEFAULT_BUFFER_RING_SIZE = 256; -LLPacketRing::LLPacketRing () - : mPacketRing(DEFAULT_BUFFER_RING_SIZE, nullptr) +LLPacketRing::LLPacketRing() + : mRing(DEFAULT_BUFFER_RING_SIZE, nullptr) { LLHost invalid_host; - for (size_t i = 0; i < mPacketRing.size(); ++i) + for (size_t i = 0; i < mRing.size(); ++i) { - mPacketRing[i] = new LLPacketBuffer(invalid_host, nullptr, 0); + mRing[i] = new LLPacketBuffer(invalid_host, nullptr, 0); } } -LLPacketRing::~LLPacketRing () +LLPacketRing::~LLPacketRing() { - for (auto packet : mPacketRing) + for (auto* packet : mRing) { delete packet; } - mPacketRing.clear(); + mRing.clear(); mNumBufferedPackets = 0; mNumBufferedBytes = 0; mHeadIndex = 0; } -S32 LLPacketRing::receivePacket (S32 socket, char *datap) -{ - bool drop = computeDrop(); - return (mNumBufferedPackets > 0) ? - receiveOrDropBufferedPacket(datap, drop) : - receiveOrDropPacket(socket, datap, drop); -} - -bool send_packet_helper(int socket, const char * datap, S32 data_size, LLHost host) -{ - if (!LLProxy::isSOCKSProxyEnabled()) - { - return send_packet(socket, datap, data_size, host.getAddress(), host.getPort()); - } - - char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; - - proxywrap_t *socks_header = static_cast(static_cast(&headered_send_buffer)); - socks_header->rsv = 0; - socks_header->addr = host.getAddress(); - socks_header->port = htons(host.getPort()); - socks_header->atype = ADDRESS_IPV4; - socks_header->frag = 0; - - memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, datap, data_size); - - return send_packet( socket, - headered_send_buffer, - data_size + SOCKS_HEADER_SIZE, - LLProxy::getInstance()->getUDPProxy().getAddress(), - LLProxy::getInstance()->getUDPProxy().getPort()); -} - -bool LLPacketRing::sendPacket(int socket, const char * datap, S32 data_size, LLHost host) +void LLPacketRing::pushPacket(const LLPacketBuffer& packet) { - mActualBytesOut += data_size; - return send_packet_helper(socket, datap, data_size, host); -} - -void LLPacketRing::dropPackets (U32 num_to_drop) -{ - mPacketsToDrop += num_to_drop; -} - -void LLPacketRing::setDropPercentage (F32 percent_to_drop) -{ - mDropPercentage = percent_to_drop; -} - -bool LLPacketRing::computeDrop() -{ - bool drop= (mDropPercentage > 0.0f && (ll_frand(100.f) < mDropPercentage)); - if (drop) - { - ++mPacketsToDrop; - } - if (mPacketsToDrop > 0) - { - --mPacketsToDrop; - drop = true; - } - return drop; -} - -S32 LLPacketRing::receiveOrDropPacket(S32 socket, char *datap, bool drop) -{ - S32 packet_size = 0; - - // pull straight from socket - if (LLProxy::isSOCKSProxyEnabled()) + S16 ring_size = (S16)mRing.size(); + if (mNumBufferedPackets >= ring_size && ring_size < MAX_BUFFER_RING_SIZE) { - char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ - packet_size = receive_packet(socket, buffer); - if (packet_size > 0) - { - mActualBytesIn += packet_size; - } - - if (packet_size > SOCKS_HEADER_SIZE) - { - if (drop) - { - packet_size = 0; - } - else - { - // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) - packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size - memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size); - proxywrap_t * header = static_cast(static_cast(buffer)); - mLastSender.setAddress(header->addr); - mLastSender.setPort(ntohs(header->port)); - mLastReceivingIF = ::get_receiving_interface(); - } - } - else - { - packet_size = 0; - } - } - else - { - packet_size = receive_packet(socket, datap); - if (packet_size > 0) - { - mActualBytesIn += packet_size; - if (drop) - { - packet_size = 0; - } - else - { - mLastSender = ::get_sender(); - mLastReceivingIF = ::get_receiving_interface(); - } - } + expandRing(); + ring_size = (S16)mRing.size(); } - return packet_size; -} -S32 LLPacketRing::receiveOrDropBufferedPacket(char *datap, bool drop) -{ - assert(mNumBufferedPackets > 0); - S32 packet_size = 0; + LLPacketBuffer* slot = mRing[mHeadIndex]; + S32 old_size = slot->getSize(); - S16 ring_size = (S16)(mPacketRing.size()); - S16 packet_index = (mHeadIndex + ring_size - mNumBufferedPackets) % ring_size; - LLPacketBuffer* packet = mPacketRing[packet_index]; - packet_size = packet->getSize(); - mLastSender = packet->getHost(); - mLastReceivingIF = packet->getReceivingInterface(); + *slot = packet; - --mNumBufferedPackets; - mNumBufferedBytes -= packet_size; - if (mNumBufferedPackets == 0) - { - assert(mNumBufferedBytes == 0); - } + mHeadIndex = (mHeadIndex + 1) % ring_size; - if (!drop) + if (mNumBufferedPackets < ring_size) { - if (packet_size > 0) - { - memcpy(datap, packet->getData(), packet_size); - } - else - { - assert(false); - } + ++mNumBufferedPackets; + mNumBufferedBytes += packet.getSize(); } else { - packet_size = 0; + // Ring is at maximum capacity; oldest packet was overwritten. + // This is VERY BAD because we've already ACKed the packet we're loosing + // (if it was "reliable"). + LL_WARNS("PacketRing") << "buffer overflow at " << mNumBufferedPackets << " packets" << LL_ENDL; + mNumBufferedBytes += packet.getSize() - old_size; } - return packet_size; } -S32 LLPacketRing::bufferInboundPacket(S32 socket) +bool LLPacketRing::popPacket(LLPacketBuffer& packet) { - if (mNumBufferedPackets == mPacketRing.size() && mNumBufferedPackets < MAX_BUFFER_RING_SIZE) + if (mNumBufferedPackets <= 0) { - expandRing(); + return false; } - LLPacketBuffer* packet = mPacketRing[mHeadIndex]; - S32 old_packet_size = packet->getSize(); - S32 packet_size = 0; - if (LLProxy::isSOCKSProxyEnabled()) - { - char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ - packet_size = receive_packet(socket, buffer); - if (packet_size > 0) - { - mActualBytesIn += packet_size; - if (packet_size > SOCKS_HEADER_SIZE) - { - // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) + S16 ring_size = (S16)mRing.size(); + S16 tail_index = (mHeadIndex + ring_size - mNumBufferedPackets) % ring_size; - proxywrap_t * header = static_cast(static_cast(buffer)); - LLHost sender; - sender.setAddress(header->addr); - sender.setPort(ntohs(header->port)); + LLPacketBuffer* slot = mRing[tail_index]; + S32 packet_size = slot->getSize(); - packet_size -= SOCKS_HEADER_SIZE; // The unwrapped packet size - packet->init(buffer + SOCKS_HEADER_SIZE, packet_size, sender); + packet = *slot; - mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); - if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) - { - ++mNumBufferedPackets; - mNumBufferedBytes += packet_size; - } - else - { - // we overwrote an older packet - mNumBufferedBytes += packet_size - old_packet_size; - } - } - else - { - packet_size = 0; - } - } - } - else - { - packet->init(socket); - packet_size = packet->getSize(); - if (packet_size > 0) - { - mActualBytesIn += packet_size; + --mNumBufferedPackets; + mNumBufferedBytes -= packet_size; - mHeadIndex = (mHeadIndex + 1) % (S16)(mPacketRing.size()); - if (mNumBufferedPackets < MAX_BUFFER_RING_SIZE) - { - ++mNumBufferedPackets; - mNumBufferedBytes += packet_size; - } - else - { - // we overwrote an older packet - mNumBufferedBytes += packet_size - old_packet_size; - } - } - } - return packet_size; -} + llassert(mNumBufferedPackets > 0 || mNumBufferedBytes == 0); -S32 LLPacketRing::drainSocket(S32 socket) -{ - // drain into buffer - S32 packet_size = 1; - S32 num_loops = 0; - S32 old_num_packets = mNumBufferedPackets; - while (packet_size > 0) - { - packet_size = bufferInboundPacket(socket); - ++num_loops; - } - S32 num_dropped_packets = (num_loops - 1 + old_num_packets) - mNumBufferedPackets; - if (num_dropped_packets > 0) - { - // It will eventually be accounted by mDroppedPackets - // and mPacketsLost, but track it here for logging purposes. - mNumDroppedPackets += num_dropped_packets; - } - return (S32)(mNumBufferedPackets); + return true; } bool LLPacketRing::expandRing() { - // compute larger size - constexpr S16 BUFFER_RING_EXPANSION = 256; - S16 old_size = (S16)(mPacketRing.size()); + constexpr S16 BUFFER_RING_EXPANSION = 512; + S16 old_size = (S16)mRing.size(); S16 new_size = llmin(old_size + BUFFER_RING_EXPANSION, MAX_BUFFER_RING_SIZE); if (new_size == old_size) { - // mPacketRing is already maxed out return false; } - // make a larger ring and copy packet pointers + // Lay existing entries out linearly in FIFO order starting at index 0. std::vector new_ring(new_size, nullptr); for (S16 i = 0; i < old_size; ++i) { S16 j = (mHeadIndex + i) % old_size; - new_ring[i] = mPacketRing[j]; + new_ring[i] = mRing[j]; } - // allocate new packets for the remainder of new_ring LLHost invalid_host; for (S16 i = old_size; i < new_size; ++i) { new_ring[i] = new LLPacketBuffer(invalid_host, nullptr, 0); } - // swap the rings and reset mHeadIndex - mPacketRing.swap(new_ring); + mRing.swap(new_ring); mHeadIndex = mNumBufferedPackets; return true; } F32 LLPacketRing::getBufferLoadRate() const { - // goes up to MAX_BUFFER_RING_SIZE return (F32)mNumBufferedPackets / (F32)DEFAULT_BUFFER_RING_SIZE; } - -void LLPacketRing::dumpPacketRingStats() -{ - mNumDroppedPacketsTotal += mNumDroppedPackets; - LL_INFOS("Messaging") << "Packet ring stats: " << std::endl - << "Buffered packets: " << mNumBufferedPackets << std::endl - << "Buffered bytes: " << mNumBufferedBytes << std::endl - << "Dropped packets current: " << mNumDroppedPackets << std::endl - << "Dropped packets total: " << mNumDroppedPacketsTotal << std::endl - << "Dropped packets percentage: " << mDropPercentage << "%" << std::endl - << "Actual in bytes: " << mActualBytesIn << std::endl - << "Actual out bytes: " << mActualBytesOut << LL_ENDL; - mNumDroppedPackets = 0; -} diff --git a/indra/llmessage/llpacketring.h b/indra/llmessage/llpacketring.h index 572dcbd271..5315e5a5bd 100644 --- a/indra/llmessage/llpacketring.h +++ b/indra/llmessage/llpacketring.h @@ -1,7 +1,16 @@ /** * @file llpacketring.h - * @brief definition of LLPacketRing class for implementing a resend, - * drop, or delay in packet transmissions + * @brief LLPacketRing: a simple ring buffer for LLPacketBuffers. + * + * LLPacketRing stores incoming UDP packets that have already been received + * from the network socket. It has no socket or proxy awareness; callers + * push packets in with pushPacket() and retrieve them in FIFO order with + * popPacket(). + * + * The ring starts at DEFAULT_BUFFER_RING_SIZE slots and grows in increments + * of BUFFER_RING_EXPANSION up to MAX_BUFFER_RING_SIZE. Once at the ceiling, + * pushPacket() silently overwrites the oldest queued packet to make room for + * the incoming one. * * $LicenseInfo:firstyear=2001&license=viewerlgpl$ * Second Life Viewer Source Code @@ -29,9 +38,7 @@ #include -#include "llhost.h" #include "llpacketbuffer.h" -#include "llthrottle.h" class LLPacketRing @@ -40,71 +47,26 @@ class LLPacketRing LLPacketRing(); ~LLPacketRing(); - // receive one packet: either buffered or from the socket - S32 receivePacket (S32 socket, char *datap); - - // send one packet - bool sendPacket(int h_socket, const char * send_buffer, S32 buf_size, LLHost host); - - // drains packets from socket and returns final mNumBufferedPackets - S32 drainSocket(S32 socket); + // Copy 'packet' onto the tail of the ring, growing the ring or + // overwriting the oldest entry when the ring is at capacity. + void pushPacket(const LLPacketBuffer& packet); - void dropPackets(U32); - void setDropPercentage (F32 percent_to_drop); - - inline LLHost getLastSender() const; - inline LLHost getLastReceivingInterface() const; - - S32 getActualInBytes() const { return mActualBytesIn; } - S32 getActualOutBytes() const { return mActualBytesOut; } - S32 getAndResetActualInBits() { S32 bits = mActualBytesIn * 8; mActualBytesIn = 0; return bits;} - S32 getAndResetActualOutBits() { S32 bits = mActualBytesOut * 8; mActualBytesOut = 0; return bits;} + // Copy the head (oldest) packet into 'packet'. + // Returns true if a packet was available, false if the ring was empty. + bool popPacket(LLPacketBuffer& packet); S32 getNumBufferedPackets() const { return (S32)(mNumBufferedPackets); } - S32 getNumBufferedBytes() const { return mNumBufferedBytes; } - S32 getNumDroppedPackets() const { return mNumDroppedPacketsTotal + mNumDroppedPackets; } - - F32 getBufferLoadRate() const; // from 0 to 4 (0 - empty, 1 - default size is full) - void dumpPacketRingStats(); -protected: - // returns 'true' if we should intentionally drop a packet - bool computeDrop(); - - // returns packet_size of received packet, zero or less if no packet found - S32 receiveOrDropPacket(S32 socket, char *datap, bool drop); - S32 receiveOrDropBufferedPacket(char *datap, bool drop); + S32 getNumBufferedBytes() const { return mNumBufferedBytes; } - // returns packet_size of packet buffered - S32 bufferInboundPacket(S32 socket); + // Ratio of buffered packets to DEFAULT_BUFFER_RING_SIZE (0 = empty, 1 = nominal full). + F32 getBufferLoadRate() const; - // returns 'true' if ring was expanded +private: + // Returns true if the ring was expanded, false if already at the ceiling. bool expandRing(); -protected: - std::vector mPacketRing; - S16 mHeadIndex { 0 }; + std::vector mRing; + S16 mHeadIndex { 0 }; S16 mNumBufferedPackets { 0 }; - S32 mNumDroppedPackets { 0 }; - S32 mNumDroppedPacketsTotal { 0 }; - S32 mNumBufferedBytes { 0 }; - - S32 mActualBytesIn { 0 }; - S32 mActualBytesOut { 0 }; - F32 mDropPercentage { 0.0f }; // % of inbound packets to drop - U32 mPacketsToDrop { 0 }; // drop next inbound n packets - - // These are the sender and receiving_interface for the last packet delivered by receivePacket() - LLHost mLastSender; - LLHost mLastReceivingIF; + S32 mNumBufferedBytes { 0 }; }; - - -inline LLHost LLPacketRing::getLastSender() const -{ - return mLastSender; -} - -inline LLHost LLPacketRing::getLastReceivingInterface() const -{ - return mLastReceivingIF; -} diff --git a/indra/llmessage/message.cpp b/indra/llmessage/message.cpp index e2937490ba..b11e4598e7 100644 --- a/indra/llmessage/message.cpp +++ b/indra/llmessage/message.cpp @@ -78,6 +78,8 @@ #include "lltransfertargetvfile.h" #include "llcorehttputil.h" #include "llpounceable.h" +#include "llproxy.h" +#include "llrand.h" // Constants //const char* MESSAGE_LOG_FILENAME = "message.log"; @@ -173,7 +175,7 @@ void LLMessageSystem::init() mTotalBytesIn = 0; mTotalBytesOut = 0; - mDroppedPackets = 0; // total dropped packets in + mLostPackets = 0; // total lost packets out mResentPackets = 0; // total resent packets out mFailedResendPackets = 0; // total resend failure packets out mOffCircuitPackets = 0; // total # of off-circuit packets rejected @@ -184,6 +186,13 @@ void LLMessageSystem::init() mIncomingCompressedSize = 0; mCurrentRecvPacketID = 0; + mActualBytesIn = 0; + mActualBytesOut = 0; + mDropPercentage = 0.0f; + mPacketsToDrop = 0; + mNumDroppedPackets = 0; + mNumDroppedPacketsTotal = 0; + mMessageFileVersionNumber = 0.f; mTimingCallback = NULL; @@ -508,18 +517,16 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) bool recv_reliable = false; bool recv_resent = false; - S32 acks = 0; + S32 num_acks = 0; S32 true_rcv_size = 0; U8* buffer = mTrueReceiveBuffer; - mTrueReceiveSize = mPacketRing.receivePacket(mSocket, (char *)mTrueReceiveBuffer); + mTrueReceiveSize = receivePacketOrDrop((char *)mTrueReceiveBuffer); // If you want to dump all received packets into SecondLife.log, uncomment this //dumpPacketToLog(); receive_size = mTrueReceiveSize; - mLastSender = mPacketRing.getLastSender(); - mLastReceivingIF = mPacketRing.getLastReceivingInterface(); if (receive_size < (S32) LL_MINIMUM_VALID_PACKET_SIZE) { @@ -535,24 +542,23 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) } else { - LLHost host; - LLCircuitData* cdp; - - // note if packet acks are appended. + // handle any packet ACKs (for outbound messages) found at tail of inbound message if(buffer[0] & LL_ACK_FLAG) { - acks += buffer[--receive_size]; + // Note: these ACKs may have already been handled if this was a buffered message + // but it doesn't hurt to handle them again. + num_acks += buffer[--receive_size]; true_rcv_size = receive_size; - if(receive_size >= ((S32)(acks * sizeof(TPACKETID) + LL_MINIMUM_VALID_PACKET_SIZE))) + if(receive_size >= ((S32)(num_acks * sizeof(TPACKETID) + LL_MINIMUM_VALID_PACKET_SIZE))) { - receive_size -= acks * sizeof(TPACKETID); + receive_size -= num_acks * sizeof(TPACKETID); } else { // mal-formed packet. ignore it and continue with // the next one LL_WARNS("Messaging") << "Malformed packet received. Packet size " - << receive_size << " with invalid no. of acks " << acks + << receive_size << " with invalid no. of acks " << num_acks << LL_ENDL; valid_packet = false; continue; @@ -562,20 +568,20 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) // process the message as normal mIncomingCompressedSize = zeroCodeExpand(&buffer, &receive_size); mCurrentRecvPacketID = ntohl(*((U32*)(&buffer[1]))); - host = getSender(); + LLHost host = getSender(); const bool resetPacketId = true; - cdp = findCircuit(host, resetPacketId); + LLCircuitData* cdp = findCircuit(host, resetPacketId); // At this point, cdp is now a pointer to the circuit that // this message came in on if it's valid, and NULL if the // circuit was bogus. - if(cdp && (acks > 0) && ((S32)(acks * sizeof(TPACKETID)) < (true_rcv_size))) + if(cdp && (num_acks > 0) && ((S32)(num_acks * sizeof(TPACKETID)) < (true_rcv_size))) { TPACKETID packet_id; U32 mem_id=0; - for(S32 i = 0; i < acks; ++i) + for(S32 i = 0; i < num_acks; ++i) { true_rcv_size -= sizeof(TPACKETID); memcpy(&mem_id, &mTrueReceiveBuffer[true_rcv_size], /* Flawfinder: ignore*/ @@ -630,7 +636,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) str << tbuf << "(unknown)" << (recv_reliable ? " reliable" : "") << " resent " - << ((acks > 0) ? "acks" : "") + << ((num_acks > 0) ? "acks" : "") << " DISCARD DUPLICATE"; LL_INFOS("Messaging") << str.str() << LL_ENDL; } @@ -680,7 +686,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) if ( valid_packet ) { - logValidMsg(cdp, host, recv_reliable, recv_resent, acks>0 ); + logValidMsg(cdp, host, recv_reliable, recv_resent, num_acks>0 ); valid_packet = mTemplateMessageReader->readMessage(buffer, host); } @@ -724,7 +730,7 @@ bool LLMessageSystem::checkMessages(LockMessageChecker&, S64 frame_count ) // Check to see if we need to print debug info if ((mt_sec - mCircuitPrintTime) > mCircuitPrintFreq) { - mPacketRing.dumpPacketRingStats(); + dumpPacketRingStats(); dumpCircuitInfo(); mCircuitPrintTime = mt_sec; } @@ -749,6 +755,10 @@ S32 LLMessageSystem::getReceiveBytes() const } } +F32 LLMessageSystem::getBufferLoadRate() const +{ + return llmax(mHighPriorityInbound.getBufferLoadRate(), mLowPriorityInbound.getBufferLoadRate()); +} void LLMessageSystem::processAcks(LockMessageChecker&, F32 collect_time) { @@ -822,7 +832,264 @@ void LLMessageSystem::processAcks(LockMessageChecker&, F32 collect_time) S32 LLMessageSystem::drainUdpSocket() { - return mPacketRing.drainSocket(mSocket); + S32 packet_size = 1; + S32 num_loops = 0; + S32 old_num_buffered_packets = getNumBufferedPackets(); + while (packet_size > 0) + { + packet_size = bufferInboundPacket(); + ++num_loops; + } + S32 num_dropped_packets = (num_loops - 1 + old_num_buffered_packets) - getNumBufferedPackets(); + if (num_dropped_packets > 0) + { + mNumDroppedPackets += num_dropped_packets; + } + return getNumBufferedPackets(); +} + +bool LLMessageSystem::computeDrop() +{ + bool drop = (mDropPercentage > 0.0f && (ll_frand(100.f) < mDropPercentage)); + if (drop) + { + ++mPacketsToDrop; + } + if (mPacketsToDrop > 0) + { + --mPacketsToDrop; + drop = true; + } + return drop; +} + +bool LLMessageSystem::isHighPriorityMessage(const LLPacketBuffer& pkt) const +{ + S32 size = pkt.getSize(); + if (size < LL_PACKET_ID_SIZE + 1) + { + return false; + } + + // We want to prioritize crucial messages use to establish viewer <--> simulator connection, + // which are all low-frequency. A simple approximation is to just prioritize all non high- + // frequency messages. + // + // High frequency messages use a single byte for message_id whereas all low- and medium- + // frequency messages have 255 at the first byte of the message_id (which is after the + // LL_PACKET_ID_SIZE bytes of packet_id). + return *((const U8*)pkt.getData() + LL_PACKET_ID_SIZE) == 255; +} + +void LLMessageSystem::dropPackets(U32 num_to_drop) +{ + mPacketsToDrop += num_to_drop; +} + +void LLMessageSystem::setDropPercentage(F32 percent_to_drop) +{ + mDropPercentage = percent_to_drop; +} + +S32 LLMessageSystem::receivePacketOrDrop(char* datap) +{ + if (getNumBufferedPackets() > 0) + { + LLHost invalid_host; + LLPacketBuffer pkt(invalid_host, nullptr, 0); + if (!mHighPriorityInbound.popPacket(pkt)) + { + mLowPriorityInbound.popPacket(pkt); + } + + S32 packet_size = pkt.getSize(); + mLastSender = pkt.getHost(); + mLastReceivingIF = pkt.getReceivingInterface(); + + if (packet_size > 0) + { + memcpy(datap, pkt.getData(), packet_size); + } + return packet_size; + } + + // Read directly from the socket. + bool drop = computeDrop(); + S32 packet_size = 0; + if (LLProxy::isSOCKSProxyEnabled()) + { + char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ + packet_size = receive_packet(mSocket, buffer); + if (packet_size > 0) + { + mActualBytesIn += packet_size; + } + if (packet_size > SOCKS_HEADER_SIZE) + { + if (drop) + { + packet_size = 0; + } + else + { + // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) + packet_size -= SOCKS_HEADER_SIZE; + memcpy(datap, buffer + SOCKS_HEADER_SIZE, packet_size); + proxywrap_t* header = static_cast(static_cast(buffer)); + mLastSender.setAddress(header->addr); + mLastSender.setPort(ntohs(header->port)); + mLastReceivingIF = ::get_receiving_interface(); + } + } + else + { + packet_size = 0; + } + } + else + { + packet_size = receive_packet(mSocket, datap); + if (packet_size > 0) + { + mActualBytesIn += packet_size; + if (drop) + { + packet_size = 0; + } + else + { + mLastSender = ::get_sender(); + mLastReceivingIF = ::get_receiving_interface(); + } + } + } + return packet_size; +} + +S32 LLMessageSystem::bufferInboundPacket() +{ + LLHost invalid_host; + LLPacketBuffer pkt(invalid_host, nullptr, 0); + S32 packet_size = 0; + + if (LLProxy::isSOCKSProxyEnabled()) + { + char buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; /* Flawfinder ignore */ + packet_size = receive_packet(mSocket, buffer); + if (packet_size > 0) + { + mActualBytesIn += packet_size; + if (packet_size > SOCKS_HEADER_SIZE) + { + // *FIX We are assuming ATYP is 0x01 (IPv4), not 0x03 (hostname) or 0x04 (IPv6) + proxywrap_t* header = static_cast(static_cast(buffer)); + LLHost sender; + sender.setAddress(header->addr); + sender.setPort(ntohs(header->port)); + packet_size -= SOCKS_HEADER_SIZE; + pkt.init(buffer + SOCKS_HEADER_SIZE, packet_size, sender); + } + else + { + packet_size = 0; + } + } + } + else + { + pkt.init(mSocket); + packet_size = pkt.getSize(); + if (packet_size > 0) + { + mActualBytesIn += packet_size; + } + } + + if (packet_size >= (S32)LL_MINIMUM_VALID_PACKET_SIZE && !computeDrop()) + { + const char* data = pkt.getData(); + LLCircuitData* cdp = mCircuitInfo.findCircuit(pkt.getHost()); + TPACKETID recv_packet_id = ntohl(*((U32*)(&data[1]))); + + // Harvest piggybacked ACKs for outbound messages from the packet tail of this inbound message + if (cdp && (data[0] & LL_ACK_FLAG)) + { + U8 num_acks = (U8)data[packet_size - 1]; + S32 true_rcv_size = packet_size - 1; + if (true_rcv_size >= (S32)(num_acks * sizeof(TPACKETID) + LL_MINIMUM_VALID_PACKET_SIZE)) + { + TPACKETID ack_id; + U32 mem_id = 0; + for (S32 i = 0; i < num_acks; ++i) + { + true_rcv_size -= sizeof(TPACKETID); + memcpy(&mem_id, &data[true_rcv_size], sizeof(TPACKETID)); /* Flawfinder: ignore */ + ack_id = ntohl(mem_id); + cdp->ackReliablePacket(ack_id); + } + if (!cdp->getUnackedPacketCount()) + { + mCircuitInfo.mUnackedCircuitMap.erase(cdp->mHost); + } + } + } + + // ACK inbound reliable packet ASAP + if (cdp && (data[0] & LL_RELIABLE_FLAG)) + { + cdp->collectRAck(recv_packet_id); + } + + if (isHighPriorityMessage(pkt)) + { + mHighPriorityInbound.pushPacket(pkt); + } + else + { + mLowPriorityInbound.pushPacket(pkt); + } + } + + return packet_size; +} + +bool LLMessageSystem::sendPacketToSocket(const char* datap, S32 data_size, LLHost host) +{ + mActualBytesOut += data_size; + if (!LLProxy::isSOCKSProxyEnabled()) + { + return send_packet(mSocket, datap, data_size, host.getAddress(), host.getPort()); + } + + char headered_send_buffer[NET_BUFFER_SIZE + SOCKS_HEADER_SIZE]; + + proxywrap_t* socks_header = static_cast(static_cast(&headered_send_buffer)); + socks_header->rsv = 0; + socks_header->addr = host.getAddress(); + socks_header->port = htons(host.getPort()); + socks_header->atype = ADDRESS_IPV4; + socks_header->frag = 0; + + memcpy(headered_send_buffer + SOCKS_HEADER_SIZE, datap, data_size); + + return send_packet(mSocket, + headered_send_buffer, + data_size + SOCKS_HEADER_SIZE, + LLProxy::getInstance()->getUDPProxy().getAddress(), + LLProxy::getInstance()->getUDPProxy().getPort()); +} + +void LLMessageSystem::dumpPacketRingStats() +{ + mNumDroppedPacketsTotal += mNumDroppedPackets; + LL_INFOS("Messaging") << "buffered_packets=" << getNumBufferedPackets() + << "buffered_bytes=" << (mHighPriorityInbound.getNumBufferedBytes() + mLowPriorityInbound.getNumBufferedBytes()) + << "recently_dropped=" << mNumDroppedPackets + << "total_dropped=" << mNumDroppedPacketsTotal + << "dropped_percentage=" << mDropPercentage << "%" + << "bytes_IN=" << mActualBytesIn + << "bytes_OUT=" << mActualBytesOut << LL_ENDL; + mNumDroppedPackets = 0; } void LLMessageSystem::copyMessageReceivedToSend() @@ -1277,7 +1544,7 @@ S32 LLMessageSystem::sendMessage(const LLHost &host) } bool success; - success = mPacketRing.sendPacket(mSocket, (char *)buf_ptr, buffer_length, host); + success = sendPacketToSocket((char *)buf_ptr, buffer_length, host); if (!success) { @@ -2607,7 +2874,7 @@ void LLMessageSystem::summarizeLogs(std::ostream& str) str << buffer << std::endl << std::endl; buffer = llformat( "SendPacket failures: %20d", mSendPacketFailureCount); str << buffer << std::endl; - buffer = llformat( "Dropped packets: %20d", mDroppedPackets); + buffer = llformat( "Dropped packets: %20d", getTotalNumDroppedPackets()); str << buffer << std::endl; buffer = llformat( "Resent packets: %20d", mResentPackets); str << buffer << std::endl; @@ -3333,7 +3600,7 @@ void LLMessageSystem::establishBidirectionalTrust(const LLHost &host, S64 frame_ void LLMessageSystem::dumpPacketToLog() { - LL_WARNS("Messaging") << "Packet Dump from:" << mPacketRing.getLastSender() << LL_ENDL; + LL_WARNS("Messaging") << "Packet Dump from:" << mLastSender << LL_ENDL; LL_WARNS("Messaging") << "Packet Size:" << mTrueReceiveSize << LL_ENDL; char line_buffer[256]; /* Flawfinder: ignore */ S32 i; diff --git a/indra/llmessage/message.h b/indra/llmessage/message.h index 14cdc48a07..e81cfe16a9 100644 --- a/indra/llmessage/message.h +++ b/indra/llmessage/message.h @@ -119,6 +119,49 @@ class LLMessageStringTable : public LLSingleton // Repeat for number of messages in file // +// UDP Packet Buffer Layout +// +// Every UDP message sent or received by LLMessageSystem uses the following +// on-wire layout. Offsets are defined in EPacketHeaderLayout below. +// +// Byte(s) Name Description +// ------- ---- ----------- +// 0 Flags Bit-field (see flag constants below): +// 0x80 LL_ZERO_CODE_FLAG – body is zero-run-length encoded +// 0x40 LL_RELIABLE_FLAG – sender expects a packet ACK +// 0x20 LL_RESENT_FLAG – this is a retransmission +// 0x10 LL_ACK_FLAG – piggybacked ACKs are appended +// at the tail of this packet +// 1-4 Packet ID Sequence number, U32 in network (big-endian) byte order. +// 5 Offset Byte offset from PHL_NAME to the start of the message +// body (past the message-ID bytes). Zero for most messages. +// 6+ Message ID Variable-length message type identifier: +// High-frequency (1 byte, values 0x01–0xFE) +// Medium-frequency (2 bytes, 0xFF hh) +// Low-frequency (4 bytes, 0xFF 0xFF hh ll) +// ... Body Message block data, described by the message template. +// Present only when LL_ZERO_CODE_FLAG is clear; otherwise +// the body (everything after byte 5) is zero-coded (see +// below). The 6-byte header is never zero-coded. +// +// Optional ACK tail (present when LL_ACK_FLAG is set in the flags byte): +// +// ... ACK IDs N packet-sequence IDs being acknowledged, each a U32 in +// network byte order, packed contiguously immediately before +// the ACK count byte. Read in reverse: walk backwards from +// just before the count byte, 4 bytes at a time. +// last ACK Count U8 giving N, the number of appended ACK IDs (max 255). +// This is the very last byte of the UDP payload. +// +// Zero-coding (applied when LL_ZERO_CODE_FLAG is set): +// +// Runs of zero bytes in the body are replaced by a two-byte token: +// 0x00 N – represents (N + 1) zero bytes, for N in 1..254 +// 0x00 0x00 N – represents (256 + N) zero bytes (wrap/overflow case) +// A literal 0x00 byte that starts no run is encoded as 0x00 0x00 0x00. +// The six-byte packet header is excluded from zero-coding and is always +// transmitted as-is. + // Constants const S32 MAX_MESSAGE_INTERNAL_NAME_SIZE = 255; const S32 MAX_BUFFER_SIZE = NET_BUFFER_SIZE; @@ -291,8 +334,11 @@ class LLMessageSystem : public LLMessageSenderInterface bool mBlockUntrustedInterface; LLHost mUntrustedInterface; + protected: + LLPacketRing mHighPriorityInbound; + LLPacketRing mLowPriorityInbound; + public: - LLPacketRing mPacketRing; LLReliablePacketParams mReliablePacketParams; // Set this flag to true when you want *very* verbose logs. @@ -334,7 +380,7 @@ class LLMessageSystem : public LLMessageSenderInterface U32 mReliablePacketsIn; // total reliable packets in U32 mReliablePacketsOut; // total reliable packets out - U32 mDroppedPackets; // total dropped packets in + U32 mLostPackets; // total reliable outbound packets declared lost U32 mResentPackets; // total resent packets out U32 mFailedResendPackets; // total resend failure packets out U32 mOffCircuitPackets; // total # of off-circuit packets rejected @@ -420,6 +466,25 @@ class LLMessageSystem : public LLMessageSenderInterface // returns total number of buffered packets after the drain S32 drainUdpSocket(); + // Inbound Packet-loss simulation controls + void dropPackets(U32 num_to_drop); + void setDropPercentage(F32 percent_to_drop); + + // UDP byte-accounting + S32 getActualInBytes() const { return mActualBytesIn; } + S32 getActualOutBytes() const { return mActualBytesOut; } + S32 getAndResetActualInBits() { S32 bits = mActualBytesIn * 8; mActualBytesIn = 0; return bits; } + S32 getAndResetActualOutBits() { S32 bits = mActualBytesOut * 8; mActualBytesOut = 0; return bits; } + + // Get number of "dropped" inbound packets + S32 getTotalNumDroppedPackets() const { return mNumDroppedPacketsTotal + mNumDroppedPackets; } + + S32 getNumBufferedPackets() const { return mHighPriorityInbound.getNumBufferedPackets() + mLowPriorityInbound.getNumBufferedPackets(); } + void dumpPacketRingStats(); + + // Send datap to host via mSocket (with SOCKS proxy support if enabled). + bool sendPacketToSocket(const char* datap, S32 data_size, LLHost host); + bool isMessageFast(const char *msg); bool isMessage(const char *msg) { @@ -754,7 +819,7 @@ class LLMessageSystem : public LLMessageSenderInterface S32 getReceiveBytes() const; S32 getUnackedListSize() const { return mUnackedListSize; } - F32 getBufferLoadRate() const { return mPacketRing.getBufferLoadRate(); } + F32 getBufferLoadRate() const; //const char* getCurrentSMessageName() const { return mCurrentSMessageName; } //const char* getCurrentSBlockName() const { return mCurrentSBlockName; } @@ -898,6 +963,32 @@ class LLMessageSystem : public LLMessageSenderInterface S32 mIncomingCompressedSize; // original size of compressed msg (0 if uncomp.) TPACKETID mCurrentRecvPacketID; // packet ID of current receive packet (for reporting) + // Socket I/O helpers + + // Receive one packet: pop from ring if buffered, else read from mSocket. + // Sets mLastSender and mLastReceivingIF. + // Returns packet_size, or 0 if no packet or packet was dropped. + S32 receivePacketOrDrop(char* datap); + + // Read one raw packet from mSocket into inbound message queues + // Returns packet_size (0 if no packet was available). + S32 bufferInboundPacket(); + + // Returns true if the next inbound packet should be intentionally dropped. + bool computeDrop(); + + // Returns true if pkt carries a high-priority message and should be queued + // in mHighPriorityInbound. + bool isHighPriorityMessage(const LLPacketBuffer& pkt) const; + + // Packet-loss simulation and byte-accounting state + S32 mActualBytesIn; + S32 mActualBytesOut; + F32 mDropPercentage; // % of inbound packets to drop + U32 mPacketsToDrop; // drop next N inbound packets + S32 mNumDroppedPackets; // inbound + S32 mNumDroppedPacketsTotal;// inbound + LLMessageBuilder* mMessageBuilder; LLTemplateMessageBuilder* mTemplateMessageBuilder; LLSDMessageBuilder* mLLSDMessageBuilder; diff --git a/indra/llmessage/net.cpp b/indra/llmessage/net.cpp index 2be5a9e5b6..0df38d040a 100644 --- a/indra/llmessage/net.cpp +++ b/indra/llmessage/net.cpp @@ -326,7 +326,7 @@ S32 receive_packet(int hSocket, char * receiveBuffer) return 0; if (WSAECONNRESET == WSAGetLastError()) return 0; - LL_INFOS() << "receivePacket() failed, Error: " << WSAGetLastError() << LL_ENDL; + LL_INFOS() << "receive_packet() failed, Error: " << WSAGetLastError() << LL_ENDL; } return nRet; diff --git a/indra/newview/llappviewer.cpp b/indra/newview/llappviewer.cpp index 4bca656108..61412434ec 100644 --- a/indra/newview/llappviewer.cpp +++ b/indra/newview/llappviewer.cpp @@ -5577,7 +5577,6 @@ void LLAppViewer::idleNetwork() pingMainloopTimeout("idleNetwork"); gObjectList.mNumNewObjects = 0; - S32 total_decoded = 0; static LLCachedControl speed_test(gSavedSettings, "SpeedTest", false); if (!speed_test()) @@ -5585,64 +5584,56 @@ void LLAppViewer::idleNetwork() LL_PROFILE_ZONE_NAMED_CATEGORY_NETWORK("idle network"); //LL_RECORD_BLOCK_TIME(FTM_IDLE_NETWORK); // decode LLTimer check_message_timer; - // Read all available packets from network const S64 frame_count = gFrameCount; // U32->S64 - F32 total_time = 0.0f; + S32 total_decoded = 0; + // Process packets from network + LockMessageChecker lmc(gMessageSystem); + while (lmc.checkAllMessages(frame_count, gServicePump)) { - bool needs_drain = false; - LockMessageChecker lmc(gMessageSystem); - while (lmc.checkAllMessages(frame_count, gServicePump)) - { - if (gDoDisconnect) - { - // We're disconnecting, don't process any more messages from the server - // We're usually disconnecting due to either network corruption or a - // server going down, so this is OK. - break; - } - - total_decoded++; + ++total_decoded; - if (total_decoded > MESSAGE_MAX_PER_FRAME) + // Time-box processing of network packets to prevent framerate catastrophe + if (check_message_timer.getElapsedTimeF32() >= CheckMessagesMaxTime) + { + // Drain the socket buffer so we know how many messages remain to process + S32 num_buffered_packets = gMessageSystem->drainUdpSocket(); + if (num_buffered_packets > total_decoded) { - needs_drain = true; - break; + // Grow CheckMessagesMaxTime until we process more packets each frame than arrive. + // This might spiral out of control on very slow computers on fast networks when + // the bandwidth settings are too high. There is a mechanism for providing backpressure + // to network bandwidth but it may be inadequate for the task + // (see LLViewerThrottle::updateDynamicThrottle() for more details). + CheckMessagesMaxTime *= 1.035f; // 3.5% ~= 2x in 20 frames, ~8x in 60 frames } - - // Prevent slow packets from completely destroying the frame rate. - // This usually happens due to clumps of avatars taking huge amount - // of network processing time (which needs to be fixed, but this is - // a good limit anyway). - total_time = check_message_timer.getElapsedTimeF32(); - if (total_time >= CheckMessagesMaxTime) + else if (num_buffered_packets == 0) { - needs_drain = true; - break; + // Reset CheckMessagesMaxTime to default value + CheckMessagesMaxTime = CHECK_MESSAGES_DEFAULT_MAX_TIME; } + break; } - if (needs_drain || gMessageSystem->mPacketRing.getNumBufferedPackets() > 0) + + if (total_decoded > MESSAGE_MAX_PER_FRAME) { - // Rather than allow packets to silently backup on the socket - // we drain them into our own buffer so we know how many exist. - S32 num_buffered_packets = gMessageSystem->drainUdpSocket(); - if (num_buffered_packets > 0) - { - // Increase CheckMessagesMaxTime so that we will eventually catch up - CheckMessagesMaxTime *= 1.035f; // 3.5% ~= 2x in 20 frames, ~8x in 60 frames - } + // MESSAGE_MAX_PER_FRAME is very high (400) + // We expect to run out of time before reaching here, but just in case... + gMessageSystem->drainUdpSocket(); + break; } - else + + if (gDoDisconnect) { - // Reset CheckMessagesMaxTime to default value - CheckMessagesMaxTime = CHECK_MESSAGES_DEFAULT_MAX_TIME; + // We're disconnecting so no need to process packets. + break; } + } - // Handle per-frame message system processing. + // Handle per-frame message system processing. - static LLCachedControl ack_collection_time(gSavedSettings, "AckCollectTime", 0.1f); - lmc.processAcks(ack_collection_time()); - } + static LLCachedControl ack_collection_time(gSavedSettings, "AckCollectTime", 0.1f); + lmc.processAcks(ack_collection_time()); } add(LLStatViewer::NUM_NEW_OBJECTS, gObjectList.mNumNewObjects); diff --git a/indra/newview/llstartup.cpp b/indra/newview/llstartup.cpp index 018c23963c..90e80a693c 100644 --- a/indra/newview/llstartup.cpp +++ b/indra/newview/llstartup.cpp @@ -330,7 +330,7 @@ void do_startup_frame() break; } } - if (needs_drain || gMessageSystem->mPacketRing.getNumBufferedPackets() > 0) + if (needs_drain || gMessageSystem->getNumBufferedPackets() > 0) { gMessageSystem->drainUdpSocket(); } @@ -714,7 +714,7 @@ bool idle_startup() F32 dropPercent = gSavedSettings.getF32("PacketDropPercentage"); - msg->mPacketRing.setDropPercentage(dropPercent); + msg->setDropPercentage(dropPercent); } LL_INFOS("AppInit") << "Message System Initialized." << LL_ENDL; diff --git a/indra/newview/llviewermenu.cpp b/indra/newview/llviewermenu.cpp index dbcf4fbbf4..3b5df72819 100644 --- a/indra/newview/llviewermenu.cpp +++ b/indra/newview/llviewermenu.cpp @@ -2299,7 +2299,7 @@ class LLAdvancedDropPacket : public view_listener_t { bool handleEvent(const LLSD& userdata) { - gMessageSystem->mPacketRing.dropPackets(1); + gMessageSystem->dropPackets(1); return true; } }; diff --git a/indra/newview/llviewerstats.cpp b/indra/newview/llviewerstats.cpp index d39d466205..4861395a46 100644 --- a/indra/newview/llviewerstats.cpp +++ b/indra/newview/llviewerstats.cpp @@ -778,7 +778,7 @@ void send_viewer_stats(bool include_preferences) LLSD &fail = body["stats"]["failures"]; fail["send_packet"] = (S32) gMessageSystem->mSendPacketFailureCount; - fail["dropped"] = (S32) gMessageSystem->mDroppedPackets; + fail["dropped"] = (S32) gMessageSystem->getTotalNumDroppedPackets(); fail["resent"] = (S32) gMessageSystem->mResentPackets; fail["failed_resends"] = (S32) gMessageSystem->mFailedResendPackets; fail["off_circuit"] = (S32) gMessageSystem->mOffCircuitPackets; diff --git a/indra/newview/llviewerthrottle.cpp b/indra/newview/llviewerthrottle.cpp index 3ccfbea6e2..90a60c7a94 100644 --- a/indra/newview/llviewerthrottle.cpp +++ b/indra/newview/llviewerthrottle.cpp @@ -45,8 +45,8 @@ using namespace LLOldEvents; const F32 MAX_FRACTIONAL = 1.5f; const F32 MIN_FRACTIONAL = 0.2f; -const F32 MIN_BANDWIDTH = 50.f; -const F32 MAX_BANDWIDTH = 6000.f; +const F32 MIN_BANDWIDTH = 50.f; // Kbps +const F32 MAX_BANDWIDTH = 6000.f; // Kbps const F32 STEP_FRACTIONAL = 0.1f; const F32 HIGH_BUFFER_LOAD_TRESHOLD = 1.f; const F32 LOW_BUFFER_LOAD_TRESHOLD = 0.8f; @@ -244,6 +244,8 @@ void LLViewerThrottle::sendToSim() const F32 LLViewerThrottle::getMaxBandwidthKbps() { + // Why are thse different than the constants with the same names higher up? + // Anybody know? -- Leviathan constexpr F32 MIN_BANDWIDTH = 100.0f; // 100 Kbps constexpr F32 MAX_BANDWIDTH = 10000.0f; // 10 Mbps @@ -309,6 +311,21 @@ void LLViewerThrottle::resetDynamicThrottle() void LLViewerThrottle::updateDynamicThrottle() { + // User configurable bandwidth settings are merely vague aspirations. Translating those + // to what should be sent to the servers is complicated. Servers will tend to spike data + // transmission upon arrival, packets will arrive faster than we can process them, and this + // can overflow the buffer, which causes packet loss. + // + // In an attempt to avoid catastrophe we periodically measure buffer load and packet loss + // and then transmit a modified desired bandwidth to the server. Unfortunately, this system + // does not work well for spikey data bursts because: + // (1) the response takes too long (up to 5 seconds) to kick in + // (2) once it starts it tends to ramp up too slowly + // (3) it doesn't know which region is providing the flood; it just assumes it is + // all from the main region + // + // TODO: fix those ^^^ problems + if (mUpdateTimer.getElapsedTimeF32() < DYNAMIC_UPDATE_DURATION) { return; @@ -347,5 +364,7 @@ void LLViewerThrottle::updateDynamicThrottle() LL_INFOS() << "Easing network throttle to " << mCurrentBandwidth << LL_ENDL; } + // Now that we've used mBufferLoadRate we reset it to zero because otherwise it only + // increases (see clamping behavior in setBufferLoadRate()). mBufferLoadRate = 0; } diff --git a/indra/newview/llviewerthrottle.h b/indra/newview/llviewerthrottle.h index ef898a97d7..7ff0a7d1f0 100644 --- a/indra/newview/llviewerthrottle.h +++ b/indra/newview/llviewerthrottle.h @@ -77,8 +77,8 @@ class LLViewerThrottle static const std::string sNames[TC_EOF]; protected: - F32 mMaxBandwidth; - F32 mCurrentBandwidth; + F32 mMaxBandwidth; // bps + F32 mCurrentBandwidth; // bps F32 mBufferLoadRate = 0; LLViewerThrottleGroup mCurrent; diff --git a/indra/newview/llworld.cpp b/indra/newview/llworld.cpp index 47e1815bc2..f358efa9c0 100644 --- a/indra/newview/llworld.cpp +++ b/indra/newview/llworld.cpp @@ -794,10 +794,10 @@ void LLWorld::updateNetStats() S32 packets_in = gMessageSystem->mPacketsIn - mLastPacketsIn; S32 packets_out = gMessageSystem->mPacketsOut - mLastPacketsOut; - S32 packets_lost = gMessageSystem->mDroppedPackets - mLastPacketsLost; + S32 packets_lost = gMessageSystem->mLostPackets - mLastPacketsLost; - F64Bits actual_in_bits(gMessageSystem->mPacketRing.getAndResetActualInBits()); - F64Bits actual_out_bits(gMessageSystem->mPacketRing.getAndResetActualOutBits()); + F64Bits actual_in_bits(gMessageSystem->getAndResetActualInBits()); + F64Bits actual_out_bits(gMessageSystem->getAndResetActualOutBits()); add(LLStatViewer::MESSAGE_SYSTEM_DATA_IN, actual_in_bits); add(LLStatViewer::MESSAGE_SYSTEM_DATA_OUT, actual_out_bits); @@ -815,7 +815,7 @@ void LLWorld::updateNetStats() mLastPacketsIn = gMessageSystem->mPacketsIn; mLastPacketsOut = gMessageSystem->mPacketsOut; - mLastPacketsLost = gMessageSystem->mDroppedPackets; + mLastPacketsLost = gMessageSystem->mLostPackets; } @@ -838,7 +838,7 @@ void LLWorld::printPacketsLost() << " packets lost: " << cdp->getPacketsLost() << LL_ENDL; } } - LL_INFOS() << "Packets dropped by Packet Ring: " << gMessageSystem->mPacketRing.getNumDroppedPackets() << LL_ENDL; + LL_INFOS() << "Packets dropped by Packet Ring: " << gMessageSystem->getTotalNumDroppedPackets() << LL_ENDL; } void LLWorld::processCoarseUpdate(LLMessageSystem* msg, void** user_data)