Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/src/main/java/io/grpc/ForwardingServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ public Server build() {
return delegate().build();
}

@Override
public T addMetricSink(MetricSink metricSink) {
delegate().addMetricSink(metricSink);
return thisT();
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("delegate", delegate()).toString();
Expand Down
104 changes: 104 additions & 0 deletions api/src/main/java/io/grpc/InternalTcpMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2026 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
* TCP Metrics defined to be shared across transport implementations.
*/
@Internal
public final class InternalTcpMetrics {

private InternalTcpMetrics() {}

private static final List<String> OPTIONAL_LABELS = Arrays.asList(
"network.local.address",
"network.local.port",
"network.peer.address",
"network.peer.port");

public static final DoubleHistogramMetricInstrument MIN_RTT_INSTRUMENT =
MetricInstrumentRegistry.getDefaultRegistry()
.registerDoubleHistogram(
"grpc.tcp.min_rtt",
"Minimum round-trip time of a TCP connection",
"s",
getMinRttBuckets(),
Collections.emptyList(),
OPTIONAL_LABELS,
false);

public static final LongCounterMetricInstrument CONNECTIONS_CREATED_INSTRUMENT =
MetricInstrumentRegistry
.getDefaultRegistry()
.registerLongCounter(
"grpc.tcp.connections_created",
"The total number of TCP connections established.",
"{connection}",
Collections.emptyList(),
OPTIONAL_LABELS,
false);

public static final LongUpDownCounterMetricInstrument CONNECTION_COUNT_INSTRUMENT =
MetricInstrumentRegistry
.getDefaultRegistry()
.registerLongUpDownCounter(
"grpc.tcp.connection_count",
"The current number of active TCP connections.",
"{connection}",
Collections.emptyList(),
OPTIONAL_LABELS,
false
);

public static final LongCounterMetricInstrument PACKETS_RETRANSMITTED_INSTRUMENT =
MetricInstrumentRegistry
.getDefaultRegistry()
.registerLongCounter(
"grpc.tcp.packets_retransmitted",
"The total number of packets retransmitted for all TCP connections.",
"{packet}",
Collections.emptyList(),
OPTIONAL_LABELS,
false
);

public static final LongCounterMetricInstrument RECURRING_RETRANSMITS_INSTRUMENT =
MetricInstrumentRegistry
.getDefaultRegistry()
.registerLongCounter(
"grpc.tcp.recurring_retransmits",
"The total number of times the retransmit timer popped for all TCP"
+ " connections.",
"{timeout}",
Collections.emptyList(),
OPTIONAL_LABELS,
false
);

private static List<Double> getMinRttBuckets() {
List<Double> buckets = new ArrayList<>(100);
for (int i = 1; i <= 100; i++) {
buckets.add(1e-6 * Math.pow(2.0, i * 0.24));
}
return Collections.unmodifiableList(buckets);
}
}
3 changes: 1 addition & 2 deletions api/src/main/java/io/grpc/NameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public static final class Args {
@Nullable private final ChannelLogger channelLogger;
@Nullable private final Executor executor;
@Nullable private final String overrideAuthority;
@Nullable private final MetricRecorder metricRecorder;
private final MetricRecorder metricRecorder;
@Nullable private final NameResolverRegistry nameResolverRegistry;
@Nullable private final IdentityHashMap<Key<?>, Object> customArgs;

Expand Down Expand Up @@ -497,7 +497,6 @@ public String getOverrideAuthority() {
/**
* Returns the {@link MetricRecorder} that the channel uses to record metrics.
*/
@Nullable
public MetricRecorder getMetricRecorder() {
return metricRecorder;
}
Expand Down
10 changes: 10 additions & 0 deletions api/src/main/java/io/grpc/ServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,16 @@ public T setBinaryLog(BinaryLog binaryLog) {
*/
public abstract Server build();

/**
* Adds a metric sink to the server.
*
* @param metricSink the metric sink to add.
* @return this
*/
public T addMetricSink(MetricSink metricSink) {
throw new UnsupportedOperationException();
}

/**
* Returns the correctly typed version of the builder.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private BinderServerBuilder(

serverImplBuilder =
new ServerImplBuilder(
streamTracerFactories -> {
(streamTracerFactories, metricRecorder) -> {
internalBuilder.setStreamTracerFactories(streamTracerFactories);
BinderServer server = internalBuilder.build();
BinderInternal.setIBinder(binderReceiver, server.getHostBinder());
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/io/grpc/internal/ClientTransportFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.ChannelCredentials;
import io.grpc.ChannelLogger;
import io.grpc.HttpConnectProxiedSocketAddress;
import io.grpc.MetricRecorder;
import java.io.Closeable;
import java.net.SocketAddress;
import java.util.Collection;
Expand Down Expand Up @@ -91,6 +92,8 @@ final class ClientTransportOptions {
private Attributes eagAttributes = Attributes.EMPTY;
@Nullable private String userAgent;
@Nullable private HttpConnectProxiedSocketAddress connectProxiedSocketAddr;
private MetricRecorder metricRecorder = new MetricRecorder() {
};

public ChannelLogger getChannelLogger() {
return channelLogger;
Expand All @@ -101,6 +104,15 @@ public ClientTransportOptions setChannelLogger(ChannelLogger channelLogger) {
return this;
}

public MetricRecorder getMetricRecorder() {
return metricRecorder;
}

public ClientTransportOptions setMetricRecorder(MetricRecorder metricRecorder) {
this.metricRecorder = Preconditions.checkNotNull(metricRecorder, "metricRecorder");
return this;
}

public String getAuthority() {
return authority;
}
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/io/grpc/internal/InternalSubchannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ final class InternalSubchannel implements InternalInstrumented<ChannelStats>, Tr
private final InternalChannelz channelz;
private final CallTracer callsTracer;
private final ChannelTracer channelTracer;
private final MetricRecorder metricRecorder;
private final ChannelLogger channelLogger;
private final boolean reconnectDisabled;

Expand Down Expand Up @@ -191,6 +192,7 @@ protected void handleNotInUse() {
this.scheduledExecutor = scheduledExecutor;
this.connectingTimer = stopwatchSupplier.get();
this.syncContext = syncContext;
this.metricRecorder = metricRecorder;
this.callback = callback;
this.channelz = channelz;
this.callsTracer = callsTracer;
Expand Down Expand Up @@ -265,6 +267,7 @@ private void startNewTransport() {
.setAuthority(eagChannelAuthority != null ? eagChannelAuthority : authority)
.setEagAttributes(currentEagAttributes)
.setUserAgent(userAgent)
.setMetricRecorder(metricRecorder)
.setHttpConnectProxiedSocketAddress(proxiedAddr);
TransportLogger transportLogger = new TransportLogger();
// In case the transport logs in the constructor, use the subchannel logId
Expand Down
21 changes: 19 additions & 2 deletions core/src/main/java/io/grpc/internal/ServerImplBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import io.grpc.HandlerRegistry;
import io.grpc.InternalChannelz;
import io.grpc.InternalConfiguratorRegistry;
import io.grpc.MetricInstrumentRegistry;
import io.grpc.MetricRecorder;
import io.grpc.MetricSink;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCallExecutorSupplier;
Expand Down Expand Up @@ -80,6 +83,7 @@ public static ServerBuilder<?> forPort(int port) {
final List<ServerTransportFilter> transportFilters = new ArrayList<>();
final List<ServerInterceptor> interceptors = new ArrayList<>();
private final List<ServerStreamTracer.Factory> streamTracerFactories = new ArrayList<>();
final List<MetricSink> metricSinks = new ArrayList<>();
private final ClientTransportServersBuilder clientTransportServersBuilder;
HandlerRegistry fallbackRegistry = DEFAULT_FALLBACK_REGISTRY;
ObjectPool<? extends Executor> executorPool = DEFAULT_EXECUTOR_POOL;
Expand All @@ -104,7 +108,8 @@ public static ServerBuilder<?> forPort(int port) {
*/
public interface ClientTransportServersBuilder {
InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories);
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
MetricRecorder metricRecorder);
}

/**
Expand Down Expand Up @@ -157,6 +162,15 @@ public ServerImplBuilder intercept(ServerInterceptor interceptor) {
return this;
}

/**
* Adds a MetricSink to the server.
*/
@Override
public ServerImplBuilder addMetricSink(MetricSink metricSink) {
metricSinks.add(checkNotNull(metricSink, "metricSink"));
return this;
}

@Override
public ServerImplBuilder addStreamTracerFactory(ServerStreamTracer.Factory factory) {
streamTracerFactories.add(checkNotNull(factory, "factory"));
Expand Down Expand Up @@ -241,8 +255,11 @@ public void setDeadlineTicker(Deadline.Ticker ticker) {

@Override
public Server build() {
MetricRecorder metricRecorder = new MetricRecorderImpl(metricSinks,
MetricInstrumentRegistry.getDefaultRegistry());
return new ServerImpl(this,
clientTransportServersBuilder.buildClientTransportServers(getTracerFactories()),
clientTransportServersBuilder.buildClientTransportServers(
getTracerFactories(), metricRecorder),
Context.ROOT);
}

Expand Down
19 changes: 15 additions & 4 deletions core/src/test/java/io/grpc/internal/ServerImplBuilderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import io.grpc.InternalConfigurator;
import io.grpc.InternalConfiguratorRegistry;
import io.grpc.Metadata;
import io.grpc.MetricRecorder;
import io.grpc.MetricSink;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
Expand Down Expand Up @@ -73,7 +76,8 @@ public void setUp() throws Exception {
new ClientTransportServersBuilder() {
@Override
public InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
MetricRecorder metricRecorder) {
throw new UnsupportedOperationException();
}
});
Expand Down Expand Up @@ -128,6 +132,13 @@ public void getTracerFactories_disableBoth() {
assertThat(factories).containsExactly(DUMMY_USER_TRACER);
}

@Test
public void addMetricSink_addsToSinks() {
MetricSink mockSink = mock(MetricSink.class);
builder.addMetricSink(mockSink);
assertThat(builder.metricSinks).containsExactly(mockSink);
}

@Test
public void getTracerFactories_callsGet() throws Exception {
Class<?> runnable = classLoader.loadClass(StaticTestingClassLoaderCallsGet.class.getName());
Expand All @@ -139,7 +150,7 @@ public static final class StaticTestingClassLoaderCallsGet implements Runnable {
public void run() {
ServerImplBuilder builder =
new ServerImplBuilder(
streamTracerFactories -> {
(streamTracerFactories, metricRecorder) -> {
throw new UnsupportedOperationException();
});
assertThat(builder.getTracerFactories()).hasSize(2);
Expand Down Expand Up @@ -169,7 +180,7 @@ public void configureServerBuilder(ServerBuilder<?> builder) {
}));
ServerImplBuilder builder =
new ServerImplBuilder(
streamTracerFactories -> {
(streamTracerFactories, metricRecorder) -> {
throw new UnsupportedOperationException();
});
assertThat(builder.getTracerFactories()).containsExactly(DUMMY_USER_TRACER);
Expand All @@ -192,7 +203,7 @@ public void run() {
InternalConfiguratorRegistry.setConfigurators(Collections.emptyList());
ServerImplBuilder builder =
new ServerImplBuilder(
streamTracerFactories -> {
(streamTracerFactories, metricRecorder) -> {
throw new UnsupportedOperationException();
});
assertThat(builder.getTracerFactories()).isEmpty();
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/java/io/grpc/internal/ServerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.grpc.InternalServerInterceptors;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MetricRecorder;
import io.grpc.ServerCall;
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallExecutorSupplier;
Expand Down Expand Up @@ -206,7 +207,8 @@ public void startUp() throws IOException {
new ClientTransportServersBuilder() {
@Override
public InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
MetricRecorder metricRecorder) {
throw new UnsupportedOperationException();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.grpc.ExperimentalApi;
import io.grpc.ForwardingServerBuilder;
import io.grpc.Internal;
import io.grpc.MetricRecorder;
import io.grpc.ServerBuilder;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.FixedObjectPool;
Expand Down Expand Up @@ -120,7 +121,8 @@ private InProcessServerBuilder(SocketAddress listenAddress) {
final class InProcessClientTransportServersBuilder implements ClientTransportServersBuilder {
@Override
public InternalServer buildClientTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
List<? extends ServerStreamTracer.Factory> streamTracerFactories,
MetricRecorder metricRecorder) {
return buildTransportServers(streamTracerFactories);
}
}
Expand Down
1 change: 1 addition & 0 deletions netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,7 @@ public void run() {
localSocketPicker,
channelLogger,
useGetForSafeMethods,
options.getMetricRecorder(),
Ticker.systemTicker());
return transport;
}
Expand Down
Loading
Loading