From f9ee579dcbaa4bf803194279ea76a3080802487a Mon Sep 17 00:00:00 2001 From: Sushan Bhattarai Date: Mon, 16 Mar 2026 14:02:34 -0400 Subject: [PATCH] for experiment --- bigtable/direct_access_check.go | 65 +++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/bigtable/direct_access_check.go b/bigtable/direct_access_check.go index 95ca6ef36e0d..abd74731aaf1 100644 --- a/bigtable/direct_access_check.go +++ b/bigtable/direct_access_check.go @@ -19,11 +19,16 @@ package bigtable import ( "context" "fmt" + "log" "strings" + "sync" + "time" + internal "cloud.google.com/go/bigtable/internal/transport" "google.golang.org/api/option" "google.golang.org/api/option/internaloption" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/google" "google.golang.org/grpc/peer" ) @@ -117,3 +122,63 @@ func CheckDirectAccessSupported(ctx context.Context, project, instance, appProfi return isDirectPathUsed, nil } + +// CallSingleChannel connects to Bigtable using the DirectPath C2P scheme, +// and continuously maintains 200 in-flight Prime requests on the same channel +// until the provided context is canceled. +func CallSingleChannel(ctx context.Context, project, instance, appProfile, target string, requestsInFlight int) error { + fullInstanceName := fmt.Sprintf("projects/%s/instances/%s", project, instance) + + log.Printf("Creating single channel to %s", target) + + // 1. Create the gRPC Client + conn, err := grpc.NewClient(target, + grpc.WithCredentialsBundle(google.NewDefaultCredentials()), + ) + if err != nil { + return fmt.Errorf("failed to create client for C2P target %s: %w", target, err) + } + defer conn.Close() + + // 2. Wrap it in the internal BigtableConn + btc := internal.NewBigtableConn(conn) + + // 3. Maintain 200 in-flight requests continuously + log.Printf("Starting %d workers to maintain continuous in-flight Prime() requests...", requestsInFlight) + + var wg sync.WaitGroup + + for i := 0; i < requestsInFlight; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + + // Infinite loop to keep firing requests + for { + // Exit the loop if the parent context is canceled + select { + case <-ctx.Done(): + return + default: + } + + // Execute a single request with its own 10s timeout + primeCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + ffMd := createFeatureFlagsMD(true, false, true) + err := btc.Prime(primeCtx, fullInstanceName, appProfile, ffMd) + cancel() // Always cancel to prevent context leaks + + if err != nil { + log.Printf("[Worker %d] Prime() failed: %v", workerID, err) + // Tiny backoff to prevent CPU thrashing if the connection fully drops + } + } + }(i) + } + + // 4. Block until the context is canceled and all workers exit + wg.Wait() + + log.Println("Stopped maintaining requests. Exiting CallSingleChannel.") + return ctx.Err() +}