From 27430c80f800c8e65699ae77531b049e6699b5e8 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 7 Apr 2026 22:08:38 +0530 Subject: [PATCH 1/2] Add QOS filtering for coordinator endpoints --- .../server/initialization/ServerConfig.java | 12 +++++++ .../jetty/CliIndexerServerModule.java | 8 +---- .../org/apache/druid/cli/CliCoordinator.java | 15 +++++++++ .../org/apache/druid/cli/CliOverlord.java | 32 +++++-------------- .../org/apache/druid/cli/CliOverlordTest.java | 23 ++++++------- 5 files changed, 48 insertions(+), 42 deletions(-) diff --git a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java index a7206ca9cf7e..2eff2f54a328 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java +++ b/server/src/main/java/org/apache/druid/server/initialization/ServerConfig.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.util.List; import java.util.Objects; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.zip.Deflater; @@ -441,6 +442,17 @@ public static int getDefaultNumThreads() return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + 30; } + public static int getNumThreadsFromProperties(Properties properties) + { + final String value = properties.getProperty("druid.server.http.numThreads"); + return value == null ? getDefaultNumThreads() : Integer.parseInt(value); + } + + public static int getDefaultMaxConcurrentRequests(int numThreads) + { + return Math.max(1, Math.max(numThreads - 4, (int) (numThreads * 0.8))); + } + public static class UriComplianceDeserializer extends JsonDeserializer { @Override diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java index c1c10547d880..f8768f3a7f04 100644 --- a/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java +++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/CliIndexerServerModule.java @@ -47,7 +47,6 @@ */ public class CliIndexerServerModule implements Module { - public static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads"; private final Properties properties; public CliIndexerServerModule(Properties properties) @@ -62,12 +61,7 @@ public void configure(Binder binder) LifecycleModule.register(binder, ChatHandlerResource.class); // Use an equal number of threads for chat handler and non-chat handler requests. - int serverHttpNumThreads; - if (properties.getProperty(SERVER_HTTP_NUM_THREADS_PROPERTY) == null) { - serverHttpNumThreads = ServerConfig.getDefaultNumThreads(); - } else { - serverHttpNumThreads = Integer.parseInt(properties.getProperty(SERVER_HTTP_NUM_THREADS_PROPERTY)); - } + int serverHttpNumThreads = ServerConfig.getNumThreadsFromProperties(properties); JettyBindings.addQosFilter( binder, diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java index 848dbd5da303..c93bb08f0a4b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java +++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java @@ -104,6 +104,8 @@ import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.http.ServersResource; import org.apache.druid.server.http.TiersResource; +import org.apache.druid.server.initialization.ServerConfig; +import org.apache.druid.server.initialization.jetty.JettyBindings; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; import org.apache.druid.server.lookup.cache.LookupCoordinatorManager; import org.apache.druid.server.lookup.cache.LookupCoordinatorManagerConfig; @@ -229,6 +231,19 @@ public void configure(Binder binder) binder.bind(JettyServerInitializer.class) .to(CoordinatorJettyServerInitializer.class); + // QoS filtering to prevent heavy coordinator API requests from starving health check endpoints. + // Set druid.coordinator.server.maxConcurrentRequests=-1 to disable. + final int serverHttpNumThreads = ServerConfig.getNumThreadsFromProperties(properties); + final int maxConcurrentRequests = properties.containsKey("druid.coordinator.server.maxConcurrentRequests") + ? Integer.parseInt(properties.getProperty("druid.coordinator.server.maxConcurrentRequests")) + : ServerConfig.getDefaultMaxConcurrentRequests(serverHttpNumThreads); + if (maxConcurrentRequests > 0) { + log.info("Coordinator QoS filtering enabled. Max concurrent requests: [%d]", maxConcurrentRequests); + JettyBindings.addQosFilter(binder, new String[]{"/druid/coordinator/v1/*", "/druid-internal/*"}, maxConcurrentRequests); + } else { + log.info("Coordinator QoS filtering disabled."); + } + Jerseys.addResource(binder, CoordinatorResource.class); Jerseys.addResource(binder, CoordinatorCompactionResource.class); Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index b96f299dfcd5..b1eb9e868ed6 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -131,7 +131,6 @@ import org.apache.druid.server.http.RedirectInfo; import org.apache.druid.server.http.SelfDiscoveryResource; import org.apache.druid.server.initialization.ServerConfig; -import org.apache.druid.server.initialization.jetty.CliIndexerServerModule; import org.apache.druid.server.initialization.jetty.JettyBindings; import org.apache.druid.server.initialization.jetty.JettyServerInitUtils; import org.apache.druid.server.initialization.jetty.JettyServerInitializer; @@ -490,27 +489,17 @@ private void configureOverlordWebResources(Binder binder) Jerseys.addResource(binder, OverlordDataSourcesResource.class); - final int serverHttpNumThreads = properties.containsKey(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY) - ? Integer.parseInt(properties.getProperty(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY)) - : ServerConfig.getDefaultNumThreads(); - - final int maxConcurrentActions; - if (properties.containsKey("druid.indexer.server.maxConcurrentActions")) { - maxConcurrentActions = Integer.parseInt(properties.getProperty("druid.indexer.server.maxConcurrentActions")); - } else { - maxConcurrentActions = getDefaultMaxConcurrentActions(serverHttpNumThreads); - } - + // QoS filtering to prevent action requests from starving health check endpoints. + // Set druid.indexer.server.maxConcurrentActions=-1 to disable. + final int serverHttpNumThreads = ServerConfig.getNumThreadsFromProperties(properties); + final int maxConcurrentActions = properties.containsKey("druid.indexer.server.maxConcurrentActions") + ? Integer.parseInt(properties.getProperty("druid.indexer.server.maxConcurrentActions")) + : ServerConfig.getDefaultMaxConcurrentRequests(serverHttpNumThreads); if (maxConcurrentActions > 0) { - // Add QoS filtering for action endpoints only - final String[] actionPaths = { - "/druid/indexer/v1/action", - }; - log.info("Overlord QoS filtering enabled for action endpoints. Max concurrent actions: [%d]", maxConcurrentActions); - JettyBindings.addQosFilter(binder, actionPaths, maxConcurrentActions); + JettyBindings.addQosFilter(binder, "/druid/indexer/v1/action", maxConcurrentActions); } else { - log.info("Overlord QoS filtering disabled for action endpoints. Max concurrent actions: [%d]", serverHttpNumThreads); + log.info("Overlord QoS filtering disabled for action endpoints."); } } }, @@ -527,11 +516,6 @@ private void configureOverlordWebResources(Binder binder) ); } - public static int getDefaultMaxConcurrentActions(int serverHttpNumThreads) - { - return Math.max(1, Math.max(serverHttpNumThreads - 4, (int) (serverHttpNumThreads * 0.8))); - } - /** */ private static class OverlordJettyServerInitializer implements JettyServerInitializer diff --git a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java index 031c38ff8b20..9fec28aeeabe 100644 --- a/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java +++ b/services/src/test/java/org/apache/druid/cli/CliOverlordTest.java @@ -26,6 +26,7 @@ import org.apache.druid.metadata.segment.SqlSegmentsMetadataManagerV2; import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache; import org.apache.druid.metadata.segment.cache.SegmentMetadataCache; +import org.apache.druid.server.initialization.ServerConfig; import org.junit.Assert; import org.junit.Test; @@ -53,27 +54,27 @@ public void testSegmentMetadataCacheIsBound() @Test - public void testGetDefaultMaxConcurrentActions() + public void testGetDefaultMaxConcurrentRequests() { // Small thread count where - Assert.assertEquals(8, CliOverlord.getDefaultMaxConcurrentActions(10)); + Assert.assertEquals(8, ServerConfig.getDefaultMaxConcurrentRequests(10)); // Medium thread count where - Assert.assertEquals(21, CliOverlord.getDefaultMaxConcurrentActions(25)); - Assert.assertEquals(26, CliOverlord.getDefaultMaxConcurrentActions(30)); + Assert.assertEquals(21, ServerConfig.getDefaultMaxConcurrentRequests(25)); + Assert.assertEquals(26, ServerConfig.getDefaultMaxConcurrentRequests(30)); // Large thread count - Assert.assertEquals(46, CliOverlord.getDefaultMaxConcurrentActions(50)); - Assert.assertEquals(96, CliOverlord.getDefaultMaxConcurrentActions(100)); + Assert.assertEquals(46, ServerConfig.getDefaultMaxConcurrentRequests(50)); + Assert.assertEquals(96, ServerConfig.getDefaultMaxConcurrentRequests(100)); // Test edge cases - return atleast 1 thread - Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(-1)); - Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(0)); + Assert.assertEquals(1, ServerConfig.getDefaultMaxConcurrentRequests(-1)); + Assert.assertEquals(1, ServerConfig.getDefaultMaxConcurrentRequests(0)); // Test small clustesr - Assert.assertEquals(2, CliOverlord.getDefaultMaxConcurrentActions(3)); - Assert.assertEquals(3, CliOverlord.getDefaultMaxConcurrentActions(4)); - Assert.assertEquals(4, CliOverlord.getDefaultMaxConcurrentActions(5)); + Assert.assertEquals(2, ServerConfig.getDefaultMaxConcurrentRequests(3)); + Assert.assertEquals(3, ServerConfig.getDefaultMaxConcurrentRequests(4)); + Assert.assertEquals(4, ServerConfig.getDefaultMaxConcurrentRequests(5)); } } From 4a9f42f46858189d047876c8c0613eb6473221e0 Mon Sep 17 00:00:00 2001 From: cryptoe Date: Tue, 7 Apr 2026 22:17:52 +0530 Subject: [PATCH 2/2] Doc updates --- docs/configuration/index.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 15d05d00b9c4..308acbc18b7b 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -746,6 +746,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |Property|Description|Default| |--------|-----------|-------| |`druid.coordinator.period`|The run period for the Coordinator. The Coordinator operates by maintaining the current state of the world in memory and periodically looking at the set of "used" segments and segments being served to make decisions about whether any changes need to be made to the data topology. This property sets the delay between each of these runs.|`PT60S`| +|`druid.coordinator.server.maxConcurrentRequests`|Maximum number of concurrent requests to coordinator API endpoints (`/druid/coordinator/v1/*`, `/druid-internal/*`) that the Coordinator will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `-1` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`| |`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZooKeeper interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|`PT300S`| |`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical service.|`PT15M`| |`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`| @@ -990,7 +991,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro |Property|Description|Default| |--------|-----------|-------| |`druid.indexer.runner.type`|Indicates whether tasks should be run locally using `local` or in a distributed environment using `remote`. The recommended option is `httpRemote`, which is similar to `remote` but uses HTTP to interact with Middle Managers instead of ZooKeeper.|`httpRemote`| -|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `0` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`| +|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `-1` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`| |`druid.indexer.storage.type`|Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. One of `local` or `metadata`. `local` is mainly for internal testing while `metadata` is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|`local`| |`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|`PT24H`| |`druid.indexer.tasklock.forceTimeChunkLock`|If set to true, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use, and may select the deprecated segment lock. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context-parameters). See [Task lock system](../ingestion/tasks.md#task-lock-system) for more details about locking in tasks.|true|