Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions bigtable/direct_access_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@ package bigtable
import (
"context"
"fmt"
"log"
"strings"
"sync"
"time"

internal "cloud.google.com/go/bigtable/internal/transport"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/google"
"google.golang.org/grpc/peer"
)

Expand Down Expand Up @@ -117,3 +122,63 @@ func CheckDirectAccessSupported(ctx context.Context, project, instance, appProfi

return isDirectPathUsed, nil
}

// CallSingleChannel connects to Bigtable using the DirectPath C2P scheme,
// and continuously maintains 200 in-flight Prime requests on the same channel
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The function comment hardcodes '200' as the number of in-flight requests. However, this is determined by the requestsInFlight parameter. The comment should be updated to reflect that this value is configurable to avoid confusion.

Suggested change
// and continuously maintains 200 in-flight Prime requests on the same channel
// and continuously maintains a configurable number of in-flight Prime requests on the same channel

// until the provided context is canceled.
func CallSingleChannel(ctx context.Context, project, instance, appProfile, target string, requestsInFlight int) error {
fullInstanceName := fmt.Sprintf("projects/%s/instances/%s", project, instance)

log.Printf("Creating single channel to %s", target)

// 1. Create the gRPC Client
conn, err := grpc.NewClient(target,
grpc.WithCredentialsBundle(google.NewDefaultCredentials()),
)
if err != nil {
return fmt.Errorf("failed to create client for C2P target %s: %w", target, err)
}
defer conn.Close()

// 2. Wrap it in the internal BigtableConn
btc := internal.NewBigtableConn(conn)

// 3. Maintain 200 in-flight requests continuously
log.Printf("Starting %d workers to maintain continuous in-flight Prime() requests...", requestsInFlight)

var wg sync.WaitGroup

for i := 0; i < requestsInFlight; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()

// Infinite loop to keep firing requests
for {
// Exit the loop if the parent context is canceled
select {
case <-ctx.Done():
return
default:
}

// Execute a single request with its own 10s timeout
primeCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
ffMd := createFeatureFlagsMD(true, false, true)
err := btc.Prime(primeCtx, fullInstanceName, appProfile, ffMd)
cancel() // Always cancel to prevent context leaks

if err != nil {
log.Printf("[Worker %d] Prime() failed: %v", workerID, err)
// Tiny backoff to prevent CPU thrashing if the connection fully drops
Comment on lines +172 to +173
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The comment on line 173 indicates an intention to add a backoff, but it's not implemented. Without a backoff, the loop could spin and consume high CPU if Prime() fails repeatedly. This suggestion adds a small time.Sleep() to implement the backoff as intended.

Suggested change
log.Printf("[Worker %d] Prime() failed: %v", workerID, err)
// Tiny backoff to prevent CPU thrashing if the connection fully drops
log.Printf("[Worker %d] Prime() failed: %v", workerID, err)
time.Sleep(100 * time.Millisecond) // Tiny backoff to prevent CPU thrashing if the connection fully drops

}
}
}(i)
}

// 4. Block until the context is canceled and all workers exit
wg.Wait()

log.Println("Stopped maintaining requests. Exiting CallSingleChannel.")
return ctx.Err()
}
Loading