Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
*iml
.DS_Store

#claude
.claude
CLAUDE.md

# gradle
.gradle
build
Expand Down Expand Up @@ -57,3 +61,6 @@ Wallet

/framework/propPath
.cache

# local security scan reports (pre-commit hook output)
.security-reports/
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,15 @@ private void throwTronError(String strategy, String params, String servlet, Exc
@Override
protected void service(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {

RuntimeData runtimeData = new RuntimeData(req);
GlobalRateLimiter.acquire(runtimeData);

RuntimeData runtimeData = new RuntimeData(req);
IRateLimiter rateLimiter = container.get(KEY_PREFIX_HTTP, getClass().getSimpleName());

boolean acquireResource = true;
// Check per-endpoint first to avoid consuming global IP/QPS quota for requests
// that would be rejected by the per-endpoint limiter anyway.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData);

if (rateLimiter != null) {
acquireResource = rateLimiter.acquire(runtimeData);
}
String contextPath = req.getContextPath();
String url = Strings.isNullOrEmpty(req.getServletPath())
? MetricLabels.UNDEFINED : contextPath + req.getServletPath();
Expand All @@ -119,7 +117,9 @@ protected void service(HttpServletRequest req, HttpServletResponse resp)
} catch (Exception unexpected) {
logger.error("Http Api {}, Method:{}. Error:", url, req.getMethod(), unexpected);
} finally {
if (rateLimiter instanceof IPreemptibleRateLimiter && acquireResource) {
// Release whenever the per-endpoint permit was acquired (covers both the normal
// completion path and the case where GlobalRateLimiter rejected the request).
if (rateLimiter instanceof IPreemptibleRateLimiter && perEndpointAcquired) {
((IPreemptibleRateLimiter) rateLimiter).release();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.tron.core.config.args.Args;

Expand All @@ -18,18 +19,22 @@ public class GlobalRateLimiter {

private static RateLimiter rateLimiter = RateLimiter.create(QPS);

public static void acquire(RuntimeData runtimeData) {
rateLimiter.acquire();
public static boolean tryAcquire(RuntimeData runtimeData) {
String ip = runtimeData.getRemoteAddr();
if (Strings.isNullOrEmpty(ip)) {
return;
if (!Strings.isNullOrEmpty(ip)) {
RateLimiter r;
try {
// cache.get is atomic: only one loader executes per key under concurrent requests,
// preventing multiple RateLimiter instances from being created for the same IP.
r = cache.get(ip, () -> RateLimiter.create(IP_QPS));
} catch (ExecutionException e) {
r = RateLimiter.create(IP_QPS);
}
if (!r.tryAcquire()) {
return false;
}
}
RateLimiter r = cache.getIfPresent(ip);
if (r == null) {
r = RateLimiter.create(IP_QPS);
cache.put(ip, r);
}
r.acquire();
return rateLimiter.tryAcquire();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,44 +104,49 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
IRateLimiter rateLimiter = container
.get(KEY_PREFIX_RPC, call.getMethodDescriptor().getFullMethodName());

RuntimeData runtimeData = new RuntimeData(call);
GlobalRateLimiter.acquire(runtimeData);

boolean acquireResource = true;
Listener<ReqT> listener = new ServerCall.Listener<ReqT>() {};

if (rateLimiter != null) {
acquireResource = rateLimiter.acquire(runtimeData);
RuntimeData runtimeData = new RuntimeData(call);
// Check per-endpoint first to avoid consuming global IP/QPS quota for requests
// that would be rejected by the per-endpoint limiter anyway.
boolean perEndpointAcquired = rateLimiter == null || rateLimiter.tryAcquire(runtimeData);
boolean acquireResource = perEndpointAcquired && GlobalRateLimiter.tryAcquire(runtimeData);

if (!acquireResource) {
// Release the per-endpoint permit when global rejected, to avoid semaphore leak.
if (rateLimiter instanceof IPreemptibleRateLimiter && perEndpointAcquired) {
((IPreemptibleRateLimiter) rateLimiter).release();
}
call.close(Status.fromCode(Code.RESOURCE_EXHAUSTED), new Metadata());
return listener;
}

Listener<ReqT> listener = new ServerCall.Listener<ReqT>() {
};

try {
if (acquireResource) {
call.setMessageCompression(true);
ServerCall.Listener<ReqT> delegate = next.startCall(call, headers);

listener = new SimpleForwardingServerCallListener<ReqT>(delegate) {
@Override
public void onComplete() {
// must release the permit to avoid the leak of permit.
if (rateLimiter instanceof IPreemptibleRateLimiter) {
((IPreemptibleRateLimiter) rateLimiter).release();
}
call.setMessageCompression(true);
ServerCall.Listener<ReqT> delegate = next.startCall(call, headers);

listener = new SimpleForwardingServerCallListener<ReqT>(delegate) {
@Override
public void onComplete() {
// must release the permit to avoid the leak of permit.
if (rateLimiter instanceof IPreemptibleRateLimiter) {
((IPreemptibleRateLimiter) rateLimiter).release();
}
}

@Override
public void onCancel() {
// must release the permit to avoid the leak of permit.
if (rateLimiter instanceof IPreemptibleRateLimiter) {
((IPreemptibleRateLimiter) rateLimiter).release();
}
@Override
public void onCancel() {
// must release the permit to avoid the leak of permit.
if (rateLimiter instanceof IPreemptibleRateLimiter) {
((IPreemptibleRateLimiter) rateLimiter).release();
}
};
} else {
call.close(Status.fromCode(Code.RESOURCE_EXHAUSTED), new Metadata());
}
}
};
} catch (Exception e) {
// next.startCall() failed — release the permit that was already acquired.
if (rateLimiter instanceof IPreemptibleRateLimiter) {
((IPreemptibleRateLimiter) rateLimiter).release();
}
String grpcFailMeterName = MetricsKey.NET_API_DETAIL_FAIL_QPS
+ call.getMethodDescriptor().getFullMethodName();
MetricsUtil.meterMark(MetricsKey.NET_API_FAIL_QPS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public DefaultBaseQqsAdapter(String paramString) {
}

@Override
public boolean acquire(RuntimeData data) {
return strategy.acquire();
public boolean tryAcquire(RuntimeData data) {
return strategy.tryAcquire();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public void release() {
}

@Override
public boolean acquire(RuntimeData data) {
return strategy.acquire();
public boolean tryAcquire(RuntimeData data) {
return strategy.tryAcquire();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ public IPQPSRateLimiterAdapter(String paramString) {
}

@Override
public boolean acquire(RuntimeData data) {
return strategy.acquire(data.getRemoteAddr());
public boolean tryAcquire(RuntimeData data) {
return strategy.tryAcquire(data.getRemoteAddr());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

public interface IRateLimiter {

boolean acquire(RuntimeData data);
boolean tryAcquire(RuntimeData data);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ public QpsRateLimiterAdapter(String paramString) {
}

@Override
public boolean acquire(RuntimeData data) {
return strategy.acquire();
public boolean tryAcquire(RuntimeData data) {
return strategy.tryAcquire();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,11 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public class GlobalPreemptibleStrategy extends Strategy {

public static final String STRATEGY_PARAM_PERMIT = "permit";
public static final int DEFAULT_PERMIT_NUM = 1;
public static final int DEFAULT_ACQUIRE_TIMEOUT = 2;

private Semaphore sp;

public GlobalPreemptibleStrategy(String paramString) {
Expand All @@ -29,20 +23,13 @@ protected Map<String, ParamItem> defaultParam() {
return map;
}

public boolean acquire() {

try {
if (!sp.tryAcquire(DEFAULT_ACQUIRE_TIMEOUT, TimeUnit.SECONDS)) {
throw new RuntimeException();
}

} catch (InterruptedException e) {
logger.error("acquire permit with error: {}", e.getMessage());
Thread.currentThread().interrupt();
} catch (RuntimeException e1) {
return false;
}
return true;
// Non-blocking: immediately rejects if no permit is available.
// Intentional change from the previous tryAcquire(2, TimeUnit.SECONDS) behaviour:
// blocking the caller for up to 2 s ties up Netty IO / gRPC executor threads and
// masks overload rather than shedding it. All rate-limiting in this stack is now
// non-blocking to keep the thread model consistent with GlobalRateLimiter.
public boolean tryAcquire() {
return sp.tryAcquire();
}

public void release() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.util.concurrent.RateLimiter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class IPQpsStrategy extends Strategy {
Expand All @@ -19,14 +20,16 @@ public IPQpsStrategy(String paramString) {
super(paramString);
}

public boolean acquire(String ip) {
RateLimiter limiter = ipLimiter.getIfPresent(ip);
if (limiter == null) {
public boolean tryAcquire(String ip) {
RateLimiter limiter;
try {
// cache.get is atomic: only one loader executes per key under concurrent requests,
// preventing multiple RateLimiter instances from being created for the same IP.
limiter = ipLimiter.get(ip, this::newRateLimiter);
} catch (ExecutionException e) {
limiter = newRateLimiter();
ipLimiter.put(ip, limiter);
}
limiter.acquire();
return true;
return limiter.tryAcquire();
}

private RateLimiter newRateLimiter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ protected Map<String, ParamItem> defaultParam() {
return map;
}

public boolean acquire() {
rateLimiter.acquire();
return true;
public boolean tryAcquire() {
return rateLimiter.tryAcquire();
}
}
Loading