Skip to content
Open
21 changes: 21 additions & 0 deletions gbn/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ func WithBoostPercent(boostPercent float32) TimeoutOptions {
}
}

// WithDynamicPongTimeout enables dynamic pong timeout based on observed RTT.
// When enabled, the pong timeout is computed as max(basePongTime,
// pongMultiplier * smoothedRTT), capped at maxPongTime. This is useful for
// relay-based transports where the round-trip time through the relay can vary
// significantly based on network conditions.
func WithDynamicPongTimeout(pongMultiplier int,
maxPongTime time.Duration) TimeoutOptions {

return func(manager *TimeoutManager) {
manager.dynamicPongTime = true

if pongMultiplier > 0 {
manager.pongMultiplier = pongMultiplier
}

if maxPongTime > 0 {
manager.maxPongTime = maxPongTime
}
}
}

// config holds the configuration values for an instance of GoBackNConn.
type config struct {
// n is the window size. The sender can send a maximum of n packets
Expand Down
7 changes: 5 additions & 2 deletions gbn/gbn_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,11 @@ func (g *GoBackNConn) sendPacketsForever() error {
default:
}

// Start the pong timer.
g.pongTicker.Reset()
// Start the pong timer. We use ResetWithInterval
// to pick up any dynamic pong timeout changes
// based on observed RTT.
pongTime := g.timeoutManager.GetPongTime()
g.pongTicker.ResetWithInterval(pongTime)
Copy link
Copy Markdown
Contributor

@ViktorT-11 ViktorT-11 Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have an issue here now as g.pongTicker can now exceed the value of g.pingTicker. If that happens, the <-g.pingTicker.Ticks() case in the outer select may be triggered prior to <-g.pongTicker.Ticks(). This could result in the connection remaining alive when it should have been closed.

If we’re dynamically increasing pongTicker, I believe we’ll need to apply the same logic to g.pingTicker. Otherwise, we may need to refactor the sendPacketsForever function to handle this more reliably.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified to clamp this value, added some additional tests here as well.

g.pongTicker.Resume()

// Also reset the ping timer.
Expand Down
259 changes: 259 additions & 0 deletions gbn/gbn_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,265 @@ func TestPayloadSplitting(t *testing.T) {
require.True(t, bytes.Equal(msg, payload1))
}

// TestBackwardsCompatMixedTimeouts ensures that a new client with increased
// ping/pong timeouts and dynamic pong can communicate with an old server using
// the original static timeout values. GBN timeouts are configured independently
// on each side and are not negotiated, so mixed versions should be compatible.
func TestBackwardsCompatMixedTimeouts(t *testing.T) {
s1Chan := make(chan []byte, 10)
s2Chan := make(chan []byte, 10)

s1Read := func(ctx context.Context) ([]byte, error) {
select {
case val := <-s1Chan:
return val, nil
case <-ctx.Done():
}
return nil, nil
}

s1Write := func(ctx context.Context, b []byte) error {
select {
case s1Chan <- b:
return nil
case <-ctx.Done():
}
return nil
}

s2Read := func(ctx context.Context) ([]byte, error) {
select {
case val := <-s2Chan:
return val, nil
case <-ctx.Done():
}
return nil, nil
}

s2Write := func(ctx context.Context, b []byte) error {
select {
case s2Chan <- b:
return nil
case <-ctx.Done():
}
return nil
}

// Old server timeouts (pre-fix values).
oldServerOpts := []Option{
WithTimeoutOptions(
WithKeepalivePing(
5*time.Second, 3*time.Second,
),
),
}

// New client timeouts (post-fix values with dynamic pong).
newClientOpts := []Option{
WithTimeoutOptions(
WithKeepalivePing(
10*time.Second, 5*time.Second,
),
WithDynamicPongTimeout(3, 15*time.Second),
),
}

ctx := context.Background()

var (
server *GoBackNConn
wg sync.WaitGroup
srvErr error
)

wg.Add(1)
go func() {
defer wg.Done()

server, srvErr = NewServerConn(
ctx, s1Write, s2Read, oldServerOpts...,
)
}()

time.Sleep(200 * time.Millisecond)

client, err := NewClientConn(
ctx, 2, s2Write, s1Read, newClientOpts...,
)
require.NoError(t, err)

wg.Wait()
require.NoError(t, srvErr)

defer func() {
client.Close()
server.Close()
}()

// Verify bidirectional communication works with mixed timeouts.
payload1 := []byte("new client -> old server")
payload2 := []byte("old server -> new client")

sendErrCh := make(chan error, 1)
go func() {
sendErrCh <- server.Send(payload2)
}()

err = client.Send(payload1)
require.NoError(t, err)

msg, err := server.Recv()
require.NoError(t, err)
require.True(t, bytes.Equal(msg, payload1))

msg, err = client.Recv()
require.NoError(t, err)
require.True(t, bytes.Equal(msg, payload2))

require.NoError(t, <-sendErrCh)

// Send multiple messages to exercise the RTT tracking that feeds the
// dynamic pong timeout.
for i := 0; i < 5; i++ {
payload := []byte("round trip " + string(rune('0'+i)))

sendErrCh := make(chan error, 1)
go func() {
sendErrCh <- server.Send(payload)
}()

err = client.Send(payload)
require.NoError(t, err)

msg, err = server.Recv()
require.NoError(t, err)
require.True(t, bytes.Equal(msg, payload))

msg, err = client.Recv()
require.NoError(t, err)
require.True(t, bytes.Equal(msg, payload))

require.NoError(t, <-sendErrCh)
}
}

// TestBackwardsCompatOldClientNewServer tests the reverse direction: an old
// client with the original timeout values connecting to a new server with
// increased timeouts and dynamic pong.
func TestBackwardsCompatOldClientNewServer(t *testing.T) {
s1Chan := make(chan []byte, 10)
s2Chan := make(chan []byte, 10)

s1Read := func(ctx context.Context) ([]byte, error) {
select {
case val := <-s1Chan:
return val, nil
case <-ctx.Done():
}
return nil, nil
}

s1Write := func(ctx context.Context, b []byte) error {
select {
case s1Chan <- b:
return nil
case <-ctx.Done():
}
return nil
}

s2Read := func(ctx context.Context) ([]byte, error) {
select {
case val := <-s2Chan:
return val, nil
case <-ctx.Done():
}
return nil, nil
}

s2Write := func(ctx context.Context, b []byte) error {
select {
case s2Chan <- b:
return nil
case <-ctx.Done():
}
return nil
}

// New server timeouts (post-fix values with dynamic pong).
newServerOpts := []Option{
WithTimeoutOptions(
WithKeepalivePing(
8*time.Second, 5*time.Second,
),
WithDynamicPongTimeout(3, 15*time.Second),
),
}

// Old client timeouts (pre-fix values).
oldClientOpts := []Option{
WithTimeoutOptions(
WithKeepalivePing(
7*time.Second, 3*time.Second,
),
),
}

ctx := context.Background()

var (
server *GoBackNConn
wg sync.WaitGroup
srvErr error
)

wg.Add(1)
go func() {
defer wg.Done()

server, srvErr = NewServerConn(
ctx, s1Write, s2Read, newServerOpts...,
)
}()

time.Sleep(200 * time.Millisecond)

client, err := NewClientConn(
ctx, 2, s2Write, s1Read, oldClientOpts...,
)
require.NoError(t, err)

wg.Wait()
require.NoError(t, srvErr)

defer func() {
client.Close()
server.Close()
}()

// Verify bidirectional communication works.
payload1 := []byte("old client -> new server")
payload2 := []byte("new server -> old client")

sendErrCh := make(chan error, 1)
go func() {
sendErrCh <- server.Send(payload2)
}()

err = client.Send(payload1)
require.NoError(t, err)

msg, err := server.Recv()
require.NoError(t, err)
require.True(t, bytes.Equal(msg, payload1))

msg, err = client.Recv()
require.NoError(t, err)
require.True(t, bytes.Equal(msg, payload2))

require.NoError(t, <-sendErrCh)
}

func setUpClientServerConns(t *testing.T, n uint8,
cRead, sRead func(ctx context.Context) ([]byte, error),
cWrite, sWrite func(ctx context.Context, b []byte) error,
Expand Down
2 changes: 1 addition & 1 deletion gbn/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func Deserialize(b []byte) (Message, error) {

switch b[0] {
case DATA:
if len(b) < 3 {
if len(b) < 4 {
return nil, io.EOF
}
return &PacketData{
Expand Down
Loading
Loading