Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|Property|Description|Default|
|--------|-----------|-------|
|`druid.coordinator.period`|The run period for the Coordinator. The Coordinator operates by maintaining the current state of the world in memory and periodically looking at the set of "used" segments and segments being served to make decisions about whether any changes need to be made to the data topology. This property sets the delay between each of these runs.|`PT60S`|
|`druid.coordinator.server.maxConcurrentRequests`|Maximum number of concurrent requests to coordinator API endpoints (`/druid/coordinator/v1/*`, `/druid-internal/*`) that the Coordinator will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `-1` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`|
|`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZooKeeper interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|`PT300S`|
|`druid.coordinator.load.timeout`|The timeout duration for when the Coordinator assigns a segment to a Historical service.|`PT15M`|
|`druid.coordinator.balancer.strategy`|The [balancing strategy](../design/coordinator.md#balancing-segments-in-a-tier) used by the Coordinator to distribute segments among the Historical servers in a tier. The `cost` strategy distributes segments by minimizing a cost function, `diskNormalized` weights these costs with the disk usage ratios of the servers and `random` distributes segments randomly.|`cost`|
Expand Down Expand Up @@ -990,7 +991,7 @@ These Overlord static configurations can be defined in the `overlord/runtime.pro
|Property|Description|Default|
|--------|-----------|-------|
|`druid.indexer.runner.type`|Indicates whether tasks should be run locally using `local` or in a distributed environment using `remote`. The recommended option is `httpRemote`, which is similar to `remote` but uses HTTP to interact with Middle Managers instead of ZooKeeper.|`httpRemote`|
|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `0` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`|
|`druid.indexer.server.maxConcurrentActions`|Maximum number of concurrent action requests (such as getting locks, creating segments, fetching segments etc) that the Overlord will process simultaneously. This prevents thread exhaustion while preserving access to health check endpoints. Set to `-1` to disable quality of service filtering entirely. If not specified, defaults to `max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`.|`max(1, max(serverHttpNumThreads - 4, serverHttpNumThreads * 0.8))`|
|`druid.indexer.storage.type`|Indicates whether incoming tasks should be stored locally (in heap) or in metadata storage. One of `local` or `metadata`. `local` is mainly for internal testing while `metadata` is recommended in production because storing incoming tasks in metadata storage allows for tasks to be resumed if the Overlord should fail.|`local`|
|`druid.indexer.storage.recentlyFinishedThreshold`|Duration of time to store task results. Default is 24 hours. If you have hundreds of tasks running in a day, consider increasing this threshold.|`PT24H`|
|`druid.indexer.tasklock.forceTimeChunkLock`|If set to true, all tasks are enforced to use time chunk lock. If not set, each task automatically chooses a lock type to use, and may select the deprecated segment lock. This configuration can be overwritten by setting `forceTimeChunkLock` in the [task context](../ingestion/tasks.md#context-parameters). See [Task lock system](../ingestion/tasks.md#task-lock-system) for more details about locking in tasks.|true|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.zip.Deflater;

Expand Down Expand Up @@ -441,6 +442,17 @@
return Math.max(10, (JvmUtils.getRuntimeInfo().getAvailableProcessors() * 17) / 16 + 2) + 30;
}

public static int getNumThreadsFromProperties(Properties properties)
{
final String value = properties.getProperty("druid.server.http.numThreads");
return value == null ? getDefaultNumThreads() : Integer.parseInt(value);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
}

public static int getDefaultMaxConcurrentRequests(int numThreads)
{
return Math.max(1, Math.max(numThreads - 4, (int) (numThreads * 0.8)));
}

public static class UriComplianceDeserializer extends JsonDeserializer<UriCompliance>
{
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
*/
public class CliIndexerServerModule implements Module
{
public static final String SERVER_HTTP_NUM_THREADS_PROPERTY = "druid.server.http.numThreads";
private final Properties properties;

public CliIndexerServerModule(Properties properties)
Expand All @@ -62,12 +61,7 @@ public void configure(Binder binder)
LifecycleModule.register(binder, ChatHandlerResource.class);

// Use an equal number of threads for chat handler and non-chat handler requests.
int serverHttpNumThreads;
if (properties.getProperty(SERVER_HTTP_NUM_THREADS_PROPERTY) == null) {
serverHttpNumThreads = ServerConfig.getDefaultNumThreads();
} else {
serverHttpNumThreads = Integer.parseInt(properties.getProperty(SERVER_HTTP_NUM_THREADS_PROPERTY));
}
int serverHttpNumThreads = ServerConfig.getNumThreadsFromProperties(properties);

JettyBindings.addQosFilter(
binder,
Expand Down
15 changes: 15 additions & 0 deletions services/src/main/java/org/apache/druid/cli/CliCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.http.ServersResource;
import org.apache.druid.server.http.TiersResource;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.jetty.JettyBindings;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
import org.apache.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
Expand Down Expand Up @@ -229,6 +231,19 @@
binder.bind(JettyServerInitializer.class)
.to(CoordinatorJettyServerInitializer.class);

// QoS filtering to prevent heavy coordinator API requests from starving health check endpoints.
// Set druid.coordinator.server.maxConcurrentRequests=-1 to disable.
final int serverHttpNumThreads = ServerConfig.getNumThreadsFromProperties(properties);
final int maxConcurrentRequests = properties.containsKey("druid.coordinator.server.maxConcurrentRequests")
? Integer.parseInt(properties.getProperty("druid.coordinator.server.maxConcurrentRequests"))

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
: ServerConfig.getDefaultMaxConcurrentRequests(serverHttpNumThreads);
if (maxConcurrentRequests > 0) {
log.info("Coordinator QoS filtering enabled. Max concurrent requests: [%d]", maxConcurrentRequests);
JettyBindings.addQosFilter(binder, new String[]{"/druid/coordinator/v1/*", "/druid-internal/*"}, maxConcurrentRequests);
} else {
log.info("Coordinator QoS filtering disabled.");
}

Jerseys.addResource(binder, CoordinatorResource.class);
Jerseys.addResource(binder, CoordinatorCompactionResource.class);
Jerseys.addResource(binder, CoordinatorDynamicConfigsResource.class);
Expand Down
32 changes: 8 additions & 24 deletions services/src/main/java/org/apache/druid/cli/CliOverlord.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@
import org.apache.druid.server.http.RedirectInfo;
import org.apache.druid.server.http.SelfDiscoveryResource;
import org.apache.druid.server.initialization.ServerConfig;
import org.apache.druid.server.initialization.jetty.CliIndexerServerModule;
import org.apache.druid.server.initialization.jetty.JettyBindings;
import org.apache.druid.server.initialization.jetty.JettyServerInitUtils;
import org.apache.druid.server.initialization.jetty.JettyServerInitializer;
Expand Down Expand Up @@ -490,27 +489,17 @@
Jerseys.addResource(binder, OverlordDataSourcesResource.class);


final int serverHttpNumThreads = properties.containsKey(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY)
? Integer.parseInt(properties.getProperty(CliIndexerServerModule.SERVER_HTTP_NUM_THREADS_PROPERTY))
: ServerConfig.getDefaultNumThreads();

final int maxConcurrentActions;
if (properties.containsKey("druid.indexer.server.maxConcurrentActions")) {
maxConcurrentActions = Integer.parseInt(properties.getProperty("druid.indexer.server.maxConcurrentActions"));
} else {
maxConcurrentActions = getDefaultMaxConcurrentActions(serverHttpNumThreads);
}

// QoS filtering to prevent action requests from starving health check endpoints.
// Set druid.indexer.server.maxConcurrentActions=-1 to disable.
final int serverHttpNumThreads = ServerConfig.getNumThreadsFromProperties(properties);
final int maxConcurrentActions = properties.containsKey("druid.indexer.server.maxConcurrentActions")
? Integer.parseInt(properties.getProperty("druid.indexer.server.maxConcurrentActions"))

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException Note

Potential uncaught 'java.lang.NumberFormatException'.
: ServerConfig.getDefaultMaxConcurrentRequests(serverHttpNumThreads);
if (maxConcurrentActions > 0) {
// Add QoS filtering for action endpoints only
final String[] actionPaths = {
"/druid/indexer/v1/action",
};

log.info("Overlord QoS filtering enabled for action endpoints. Max concurrent actions: [%d]", maxConcurrentActions);
JettyBindings.addQosFilter(binder, actionPaths, maxConcurrentActions);
JettyBindings.addQosFilter(binder, "/druid/indexer/v1/action", maxConcurrentActions);
} else {
log.info("Overlord QoS filtering disabled for action endpoints. Max concurrent actions: [%d]", serverHttpNumThreads);
log.info("Overlord QoS filtering disabled for action endpoints.");
}
}
},
Expand All @@ -527,11 +516,6 @@
);
}

public static int getDefaultMaxConcurrentActions(int serverHttpNumThreads)
{
return Math.max(1, Math.max(serverHttpNumThreads - 4, (int) (serverHttpNumThreads * 0.8)));
}

/**
*/
private static class OverlordJettyServerInitializer implements JettyServerInitializer
Expand Down
23 changes: 12 additions & 11 deletions services/src/test/java/org/apache/druid/cli/CliOverlordTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.metadata.segment.SqlSegmentsMetadataManagerV2;
import org.apache.druid.metadata.segment.cache.HeapMemorySegmentMetadataCache;
import org.apache.druid.metadata.segment.cache.SegmentMetadataCache;
import org.apache.druid.server.initialization.ServerConfig;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -53,27 +54,27 @@ public void testSegmentMetadataCacheIsBound()


@Test
public void testGetDefaultMaxConcurrentActions()
public void testGetDefaultMaxConcurrentRequests()
{
// Small thread count where
Assert.assertEquals(8, CliOverlord.getDefaultMaxConcurrentActions(10));
Assert.assertEquals(8, ServerConfig.getDefaultMaxConcurrentRequests(10));

// Medium thread count where
Assert.assertEquals(21, CliOverlord.getDefaultMaxConcurrentActions(25));
Assert.assertEquals(26, CliOverlord.getDefaultMaxConcurrentActions(30));
Assert.assertEquals(21, ServerConfig.getDefaultMaxConcurrentRequests(25));
Assert.assertEquals(26, ServerConfig.getDefaultMaxConcurrentRequests(30));


// Large thread count
Assert.assertEquals(46, CliOverlord.getDefaultMaxConcurrentActions(50));
Assert.assertEquals(96, CliOverlord.getDefaultMaxConcurrentActions(100));
Assert.assertEquals(46, ServerConfig.getDefaultMaxConcurrentRequests(50));
Assert.assertEquals(96, ServerConfig.getDefaultMaxConcurrentRequests(100));

// Test edge cases - return atleast 1 thread
Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(-1));
Assert.assertEquals(1, CliOverlord.getDefaultMaxConcurrentActions(0));
Assert.assertEquals(1, ServerConfig.getDefaultMaxConcurrentRequests(-1));
Assert.assertEquals(1, ServerConfig.getDefaultMaxConcurrentRequests(0));

// Test small clustesr
Assert.assertEquals(2, CliOverlord.getDefaultMaxConcurrentActions(3));
Assert.assertEquals(3, CliOverlord.getDefaultMaxConcurrentActions(4));
Assert.assertEquals(4, CliOverlord.getDefaultMaxConcurrentActions(5));
Assert.assertEquals(2, ServerConfig.getDefaultMaxConcurrentRequests(3));
Assert.assertEquals(3, ServerConfig.getDefaultMaxConcurrentRequests(4));
Assert.assertEquals(4, ServerConfig.getDefaultMaxConcurrentRequests(5));
}
}
Loading