Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,15 @@ func (c *grpcStorageClient) prepareDirectPathMetadata(ctx context.Context, targe
}

func (c *grpcStorageClient) Close() error {
// Close the underlying gRPC connection first.
// This ensures that any active streams are cleanly finalized, allowing
// their stats handlers to flush `End` events before the metrics pipeline
// is torn down.
err := c.raw.Close()
if c.settings.metricsContext != nil {
c.settings.metricsContext.close()
}
return c.raw.Close()
return err
}

// Top-level methods.
Expand Down
13 changes: 10 additions & 3 deletions storage/grpc_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ type metricsContext struct {
clientOpts []option.ClientOption
// instance of metric reader used by gRPC client-side metrics
provider *metric.MeterProvider
// true if the provider was created by the SDK and should be shut down here
isDefaultProvider bool
// clean func to call when closing gRPC client
close func()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The close function should return an error to propagate the result of provider.Shutdown. The Shutdown method on MeterProvider can return an error (e.g., if the shutdown context times out) which should be handled by the client's Close method.

Suggested change
close func()
close func() error

}
Expand Down Expand Up @@ -173,9 +175,11 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte
meterOpts = append(meterOpts, metric.WithReader(
metric.NewPeriodicReader(&exporterLogSuppressor{Exporter: exporter}, metric.WithInterval(interval))))
}
isDefault := false
provider := cfg.meterProvider
if provider == nil {
provider = metric.NewMeterProvider(meterOpts...)
isDefault = true
}
mo := opentelemetry.MetricsOptions{
MeterProvider: provider,
Expand Down Expand Up @@ -204,10 +208,13 @@ func newGRPCMetricContext(ctx context.Context, cfg metricsConfig) (*metricsConte
grpc.WithDefaultCallOptions(grpc.StaticMethodCallOption{})),
}
return &metricsContext{
clientOpts: opts,
provider: provider,
clientOpts: opts,
provider: provider,
isDefaultProvider: isDefault,
close: func() {
provider.Shutdown(ctx)
if isDefault {
provider.Shutdown(ctx)
}
},
Comment on lines 214 to 218
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are two issues with the current implementation of close:

  1. The function captures ctx from newGRPCMetricContext. This context is associated with client initialization and is likely to be expired when client.Close() is called. This would cause provider.Shutdown(ctx) to fail. A new context should be created for the shutdown process.
  2. The error returned by provider.Shutdown is ignored. This error should be returned to the caller.

Note that after this change, grpcStorageClient.Close() will also need to be updated to handle the error returned from metricsContext.close().

		close: func() error {
			if isDefault {
				// Use a background context with a timeout for shutdown to ensure
				// it can complete even if the original context is expired.
				shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
				defer cancel()
				return provider.Shutdown(shutdownCtx)
			}
			return nil
		},

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix this?

}, nil
}
Expand Down
Loading