diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 15d05d00b9c4..172d8062a5e8 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -804,7 +804,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti |--------|-----------|-------| |`druid.manager.config.pollDuration`|How often the manager polls the config table for updates.|`PT1M`| |`druid.manager.segments.pollDuration`|The duration between polls the Coordinator does for updates to the set of active segments. Generally defines the amount of lag time it can take for the Coordinator to notice new segments.|`PT1M`| -|`druid.manager.segments.useIncrementalCache`|(Experimental) Denotes the usage mode of the segment metadata incremental cache. This cache provides a performance improvement over the polling mechanism currently employed by the Coordinator as it retrieves payloads of only updated segments. Possible cache modes are: (a) `never`: Incremental cache is disabled. (b) `always`: Incremental cache is enabled. Service start-up will be blocked until cache has synced with the metadata store at least once. (c) `ifSynced`: Cache is enabled. This mode does not block service start-up and is a way to retain existing behavior of the Coordinator. If the incremental cache is in modes `always` or `ifSynced`, reads from the cache will block until it has synced with the metadata store at least once after becoming leader. The Coordinator never writes to this cache.|`never`| +|`druid.manager.segments.useIncrementalCache`|Denotes the usage mode of the segment metadata incremental cache. This cache provides a performance improvement over the polling mechanism currently employed by the Coordinator as it retrieves payloads of only updated segments. Possible cache modes are: (a) `never`: Incremental cache is disabled. (b) `always`: Incremental cache is enabled. Service start-up will be blocked until cache has synced with the metadata store at least once. (c) `ifSynced`: Cache is enabled. This mode does not block service start-up and is a way to retain existing behavior of the Coordinator. If the incremental cache is in modes `always` or `ifSynced`, reads from the cache will block until it has synced with the metadata store at least once after becoming leader. The Coordinator never writes to this cache.|`ifSynced`| |`druid.manager.rules.pollDuration`|The duration between polls the Coordinator does for updates to the set of active rules. Generally defines the amount of lag time it can take for the Coordinator to notice rules.|`PT1M`| |`druid.manager.rules.defaultRule`|The default rule for the cluster|`_default`| |`druid.manager.rules.alertThreshold`|The duration after a failed poll upon which an alert should be emitted.|`PT10M`| @@ -1053,14 +1053,14 @@ If autoscaling is enabled, you can set these additional configs: The `druid.supervisor.idleConfig.*` specification in the Overlord runtime properties defines the default behavior for the entire cluster. See [Idle Configuration in Kafka Supervisor IOConfig](../ingestion/kinesis-ingestion.md#io-configuration) to override it for an individual supervisor. -##### Segment metadata cache (Experimental) +##### Segment metadata cache The following properties pertain to segment metadata caching on the Overlord that may be used to speed up segment allocation and other metadata operations. |Property|Description|Default| |--------|-----------|-------| -|`druid.manager.segments.useIncrementalCache`|Denotes the usage mode of the segment metadata incremental cache. Possible modes are: (a) `never`: Cache is disabled. (b) `always`: Reads are always done from the cache. Service start-up will be blocked until cache has synced with the metadata store at least once. Transactions will block until cache has synced with the metadata store at least once after becoming leader. (c) `ifSynced`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`never`| -|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when `druid.manager.segments.useIncrementalCache` is set to `always` or `ifSynced`.|`PT1M` (1 minute)| +|`druid.manager.segments.useIncrementalCache`|Denotes the usage mode of the segment metadata incremental cache. Possible modes are: (a) `never`: Cache is disabled. (b) `always`: Reads are always done from the cache. Service start-up will be blocked until cache has synced with the metadata store at least once. Transactions will block until cache has synced with the metadata store at least once after becoming leader. (c) `ifSynced`: Reads are done from the cache only if it has already synced with the metadata store. This mode does not block service start-up or transactions.|`ifSynced`| +|`druid.manager.segments.pollDuration`|Duration (in ISO 8601 format) between successive syncs of the cache with the metadata store. This property is used only when incremental cache usage mode is to `always` or `ifSynced`.|`PT1M` (1 minute)| ##### Auto-kill unused segments (Experimental) @@ -1069,7 +1069,7 @@ None of the configs that apply to [auto-kill performed by the Coordinator](../da |Property|Description|Default| |--------|-----------|-------| -|`druid.manager.segments.killUnused.enabled`|Boolean flag to enable auto-kill of eligible unused segments on the Overlord. This feature can be used only when [segment metadata caching](#segment-metadata-cache-experimental) is enabled on the Overlord and MUST NOT be enabled if `druid.coordinator.kill.on` is already set to `true` on the Coordinator.|`true`| +|`druid.manager.segments.killUnused.enabled`|Boolean flag to enable auto-kill of eligible unused segments on the Overlord. This feature can be used only when [segment metadata caching](#segment-metadata-cache) is enabled on the Overlord and MUST NOT be enabled if `druid.coordinator.kill.on` is already set to `true` on the Coordinator.|`true`| |`druid.manager.segments.killUnused.bufferPeriod`|Period after which a segment marked as unused becomes eligible for auto-kill on the Overlord. This config is effective only if `druid.manager.segments.killUnused.enabled` is set to `true`.|`P30D` (30 days)| #### Overlord dynamic configuration diff --git a/docs/data-management/automatic-compaction.md b/docs/data-management/automatic-compaction.md index 3f144e05a950..e455319d924a 100644 --- a/docs/data-management/automatic-compaction.md +++ b/docs/data-management/automatic-compaction.md @@ -28,6 +28,11 @@ In Apache Druid, compaction is a special type of ingestion task that reads data Auto-compaction skips datasources that have a segment granularity of `ALL`. ::: +There are two ways to run automatic compaction in Druid: + +* **Compaction supervisors on the Overlord** (recommended): Provides better reactivity, support for the MSQ task engine, and easier management through the supervisor framework. See [Auto-compaction using compaction supervisors](#auto-compaction-using-compaction-supervisors). +* **Coordinator duties**: An alternative approach that runs compaction as a Coordinator duty. See [Auto-compaction using Coordinator duties](#auto-compaction-using-coordinator-duties). + As a best practice, you should set up auto-compaction for all Druid datasources. You can run compaction tasks manually for cases where you want to allocate more system resources. For example, you may choose to run multiple compaction tasks in parallel to compact an existing datasource for the first time. See [Compaction](compaction.md) for additional details and use cases. This topic guides you through setting up automatic compaction for your Druid cluster. See the [examples](#examples) for common use cases for automatic compaction. @@ -52,14 +57,6 @@ The automatic compaction system uses the following syntax: } ``` -:::info - -The MSQ task engine is available as a compaction engine when you run automatic compaction as a compaction supervisor. For more information, see [Auto-compaction using compaction supervisors](#auto-compaction-using-compaction-supervisors). - -::: - -For automatic compaction using Coordinator duties, you submit the spec to the [Compaction config UI](#manage-auto-compaction-using-the-web-console) or the [Compaction configuration API](#manage-auto-compaction-using-coordinator-apis). - Most fields in the auto-compaction configuration correlate to a typical [Druid ingestion spec](../ingestion/ingestion-spec.md). The following properties only apply to auto-compaction: * `skipOffsetFromLatest` @@ -68,9 +65,9 @@ The following properties only apply to auto-compaction: Since the automatic compaction system provides a management layer on top of manual compaction tasks, the auto-compaction configuration does not include task-specific properties found in a typical Druid ingestion spec. -The following properties are automatically set by the Coordinator: +The following properties are automatically set by the auto-compaction system: * `type`: Set to `compact`. -* `id`: Generated using the task type, datasource name, interval, and timestamp. The task ID is prefixed with `coordinator-issued`. +* `id`: Generated using the task type, datasource name, interval, and timestamp. The task ID is prefixed with `auto`. * `context`: Set according to the user-provided `taskContext`. Compaction tasks typically fetch all [relevant segments](manual-compaction.md#compaction-io-configuration) prior to launching any subtasks, @@ -83,9 +80,136 @@ maximize performance and minimize disk usage of the `compact` tasks launched by For more details on each of the specs in an auto-compaction configuration, see [Automatic compaction dynamic configuration](../configuration/index.md#automatic-compaction-dynamic-configuration). +## Auto-compaction using compaction supervisors + +:::info +For advanced time-based data lifecycle management — such as coarsening segment granularity, deleting old rows, or changing compression as data ages — see [Cascading reindexing](cascading-reindexing.md). +::: + +The recommended way to run automatic compaction is using compaction supervisors on the Overlord. Compaction supervisors provide the following benefits: + +* Can use the supervisor framework to get information about the auto-compaction, such as status or state +* More easily suspend or resume compaction for a datasource +* Can use either the native compaction engine or the [MSQ task engine](#use-msq-for-auto-compaction) +* More reactive and submits tasks as soon as a compaction slot is available +* Tracked compaction task status to avoid re-compacting an interval repeatedly +* Can be configured to store compact fingerprints instead of full compaction state in metadata storage +* Supports minor compaction through the [Most fragmented first policy](../api-reference/automatic-compaction-api.md#compaction-policy-mostfragmentedfirst) + +To use a compaction supervisor, submit the auto-compaction configuration as a supervisor spec. Set `type` to `autocompact` and include the auto-compaction config in the `spec`. + +To submit an automatic compaction task, you can submit a supervisor spec through the [web console](#manage-compaction-supervisors-with-the-web-console) or the [supervisor API](#manage-compaction-supervisors-with-supervisor-apis). + + +### Manage compaction supervisors with the web console + +To submit a supervisor spec for MSQ task engine automatic compaction, perform the following steps: + +1. In the web console, go to the **Supervisors** tab. +1. Click **...** > **Submit JSON supervisor**. +1. In the dialog, include the following: + - The type of supervisor spec by setting `"type": "autocompact"` + - The compaction configuration by adding it to the `spec` field + ```json + { + "type": "autocompact", + "spec": { + "dataSource": YOUR_DATASOURCE, + "tuningConfig": {...}, + "granularitySpec": {...}, + "engine": , + ... + } + ``` +1. Submit the supervisor. + +To stop the automatic compaction task, suspend or terminate the supervisor through the UI or API. + +### Manage compaction supervisors with supervisor APIs + +Submitting an automatic compaction as a supervisor task uses the same endpoint as supervisor tasks for streaming ingestion. + +The following example configures auto-compaction for the `wikipedia` datasource: + +```sh +curl --location --request POST 'http://localhost:8081/druid/indexer/v1/supervisor' \ +--header 'Content-Type: application/json' \ +--data-raw '{ + "type": "autocompact", // required + "suspended": false, // optional + "spec": { // required + "dataSource": "wikipedia", // required + "tuningConfig": {...}, // optional + "granularitySpec": {...}, // optional + "engine": , // optional + ... + } +}' +``` + +Note that if you omit `spec.engine`, Druid uses the default compaction engine. You can control the default compaction engine by setting `engine` in the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config). If `spec.engine` and `engine` in compaction dynamic config are omitted, Druid defaults to the `native` engine. + +To stop the automatic compaction task, suspend or terminate the supervisor through the UI or API. + +### Use MSQ for auto-compaction + +The MSQ task engine is available as a compaction engine if you configure auto-compaction to use compaction supervisors. To use the MSQ task engine for automatic compaction, make sure the following requirements are met: + +* Enable [incremental segment metadata caching](../configuration/index.md#metadata-retrieval) on the Overlord. +* Enable [Auto-compaction using compaction supervisors](#auto-compaction-using-compaction-supervisors). +* Update the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config) and set `engine` to `msq` to use the MSQ task engine as the default compaction engine for all compaction supervisors. + * Alternatively, you may override the default engine by setting `spec.engine` to `msq` in the compaction supervisor for the relevant datasource. +* Have at least two compaction task slots available or set `spec.taskContext.maxNumTasks` to two or more. The MSQ task engine requires at least two tasks to run, one controller task and one worker task. + +You can use [MSQ task engine context parameters](../multi-stage-query/reference.md#context-parameters) in `spec.taskContext` when configuring your datasource for automatic compaction, such as setting the maximum number of tasks using the `spec.taskContext.maxNumTasks` parameter. Some of the MSQ task engine context parameters overlap with automatic compaction parameters. When these settings overlap, set one or the other. + + +#### MSQ task engine limitations + + + +When using the MSQ task engine for auto-compaction, keep the following limitations in mind: + +- The `metricSpec` field is only supported for certain aggregators. For more information, see [Supported aggregators](#supported-aggregators). +- Only dynamic and range-based partitioning are supported. +- Set `rollup` to `true` if and only if `metricSpec` is not empty or null. +- You can only partition on string dimensions. However, multi-valued string dimensions are not supported. +- The `maxTotalRows` config is not supported in `DynamicPartitionsSpec`. Use `maxRowsPerSegment` instead. +- Segments can only be sorted on `__time` as the first column. + +#### Supported aggregators + +Auto-compaction using the MSQ task engine supports only aggregators that satisfy the following properties: +* __Mergeability__: can combine partial aggregates +* __Idempotency__: produces the same results on repeated runs of the aggregator on previously aggregated values in a column + +This is exemplified by the following `longSum` aggregator: + +``` +{"name": "added", "type": "longSum", "fieldName": "added"} +``` + +where `longSum` being capable of combining partial results satisfies mergeability, while input and output column being the same (`added`) ensures idempotency. + +The following are some examples of aggregators that aren't supported since at least one of the required conditions aren't satisfied: + +* `longSum` aggregator where the `added` column rolls up into `sum_added` column discarding the input `added` column, violating idempotency, as subsequent runs would no longer find the `added` column: + ``` + {"name": "sum_added", "type": "longSum", "fieldName": "added"} + ``` +* Partial sketches which cannot themselves be used to combine partial aggregates and need merging aggregators -- such as `HLLSketchMerge` required for `HLLSketchBuild` aggregator below -- violating mergeability: + ``` + {"name": "added", "type": "HLLSketchBuild", "fieldName": "added"} + ``` +* Count aggregator since it cannot be used to combine partial aggregates and it rolls up into a different `count` column discarding the input column(s), violating both mergeability and idempotency. + ``` + {"type": "count", "name": "count"} + ``` + + ## Auto-compaction using Coordinator duties -You can control how often the Coordinator checks to see if auto-compaction is needed. The Coordinator [indexing period](../configuration/index.md#data-management), `druid.coordinator.period.indexingPeriod`, controls the frequency of compaction tasks. +As an alternative to compaction supervisors, you can run automatic compaction using Coordinator duties. The Coordinator [indexing period](../configuration/index.md#data-management), `druid.coordinator.period.indexingPeriod`, controls the frequency of compaction tasks. The default indexing period is 30 minutes, meaning that the Coordinator first checks for segments to compact at most 30 minutes from when auto-compaction is enabled. This time period also affects other Coordinator duties such as cleanup of unused segments and stale pending segments. To configure the auto-compaction time period without interfering with `indexingPeriod`, see [Set frequency of compaction runs](#change-compaction-frequency). @@ -94,10 +218,15 @@ At every invocation of auto-compaction, the Coordinator initiates a [segment sea When there are eligible segments to compact, the Coordinator issues compaction tasks based on available worker capacity. If a compaction task takes longer than the indexing period, the Coordinator waits for it to finish before resuming the period for segment search. -No additional configuration is needed to run automatic compaction tasks using the Coordinator and native engine. This is the default behavior for Druid. -You can configure it for a datasource through the web console or programmatically via an API. +You can configure Coordinator-based auto-compaction for a datasource through the web console or programmatically via an API. This process differs for manual compaction tasks, which can be submitted from the [Tasks view of the web console](../operations/web-console.md) or the [Tasks API](../api-reference/tasks-api.md). +To use Coordinator-based auto-compaction, the following configuration requirements must be met: + +* update the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config) and set: + * `useSupervisors` to `false` + * `engine` to `native` to use the native engine, since MSQ task engine is not supported as the compaction engine in Coordinator. + ### Manage auto-compaction using the web console Use the web console to enable automatic compaction for a datasource as follows: @@ -106,14 +235,14 @@ Use the web console to enable automatic compaction for a datasource as follows: 2. In the **Compaction** column, click the edit icon for the datasource to compact. 3. In the **Compaction config** dialog, configure the auto-compaction settings. The dialog offers a form view as well as a JSON view. Editing the form updates the JSON specification, and editing the JSON updates the form field, if present. Form fields not present in the JSON indicate default values. You may add additional properties to the JSON for auto-compaction settings not displayed in the form. See [Configure automatic compaction](#auto-compaction-syntax) for supported settings for auto-compaction. 4. Click **Submit**. -5. Refresh the **Datasources** view. The **Compaction** column for the datasource changes from “Not enabled” to “Awaiting first run.” +5. Refresh the **Datasources** view. The **Compaction** column for the datasource changes from "Not enabled" to "Awaiting first run." The following screenshot shows the compaction config dialog for a datasource with auto-compaction enabled. ![Compaction config in web console](../assets/compaction-config.png) To disable auto-compaction for a datasource, click **Delete** from the **Compaction config** dialog. Druid does not retain your auto-compaction configuration. -### Manage auto-compaction using Coordinator APIs +### Manage auto-compaction using Coordinator APIs Use the [Automatic compaction API](../api-reference/automatic-compaction-api.md#manage-automatic-compaction) to configure automatic compaction. To enable auto-compaction for a datasource, create a JSON object with the desired auto-compaction settings. @@ -201,7 +330,7 @@ The following auto-compaction configuration compacts existing `HOUR` segments in "granularitySpec": { "segmentGranularity": "DAY" }, - "skipOffsetFromLatest": "P1W", + "skipOffsetFromLatest": "P1W" } ``` @@ -228,144 +357,6 @@ The following auto-compaction configuration compacts updates the `wikipedia` seg } ``` -## Auto-compaction using compaction supervisors - -:::info -For advanced time-based data lifecycle management — such as coarsening segment granularity, deleting old rows, or changing compression as data ages — see [Cascading reindexing](cascading-reindexing.md). -::: - -You can run automatic compaction using compaction supervisors on the Overlord rather than Coordinator duties. Compaction supervisors provide the following benefits over Coordinator duties: - -* Can use the supervisor framework to get information about the auto-compaction, such as status or state -* More easily suspend or resume compaction for a datasource -* Can use either the native compaction engine or the [MSQ task engine](#use-msq-for-auto-compaction) -* More reactive and submits tasks as soon as a compaction slot is available -* Tracked compaction task status to avoid re-compacting an interval repeatedly -* Uses new Indexing State Fingerprinting mechanisms to store less data per segment in metadata storage - - -To use compaction supervisors, the following configuration requirements must be met: - -* You must be using incremental segment metadata caching: - * `druid.manager.segments.useIncrementalCache` set to `always` or `ifSynced` in your Overlord and Coordinator runtime properties. - * See [Segment metadata caching](../configuration/index.md#metadata-retrieval) for full configuration documentation. - -* update the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config) and set: - * `useSupervisors` to `true` so that compaction tasks can be run as supervisor tasks - * `engine` to `msq` to use the MSQ task engine as the compaction engine or to `native` (default value) to use the native engine. - -Compaction supervisors use the same syntax as auto-compaction using Coordinator duties with one key difference: you submit the auto-compaction as a supervisor spec. In the spec, set the `type` to `autocompact` and include the auto-compaction config in the `spec`. - -To submit an automatic compaction task, you can submit a supervisor spec through the [web console](#manage-compaction-supervisors-with-the-web-console) or the [supervisor API](#manage-compaction-supervisors-with-supervisor-apis). - - -### Manage compaction supervisors with the web console - -To submit a supervisor spec for MSQ task engine automatic compaction, perform the following steps: - -1. In the web console, go to the **Supervisors** tab. -1. Click **...** > **Submit JSON supervisor**. -1. In the dialog, include the following: - - The type of supervisor spec by setting `"type": "autocompact"` - - The compaction configuration by adding it to the `spec` field - ```json - { - "type": "autocompact", - "spec": { - "dataSource": YOUR_DATASOURCE, - "tuningConfig": {...}, - "granularitySpec": {...}, - "engine": , - ... - } - ``` -1. Submit the supervisor. - -To stop the automatic compaction task, suspend or terminate the supervisor through the UI or API. - -### Manage compaction supervisors with supervisor APIs - -Submitting an automatic compaction as a supervisor task uses the same endpoint as supervisor tasks for streaming ingestion. - -The following example configures auto-compaction for the `wikipedia` datasource: - -```sh -curl --location --request POST 'http://localhost:8081/druid/indexer/v1/supervisor' \ ---header 'Content-Type: application/json' \ ---data-raw '{ - "type": "autocompact", // required - "suspended": false, // optional - "spec": { // required - "dataSource": "wikipedia", // required - "tuningConfig": {...}, // optional - "granularitySpec": {...}, // optional - "engine": , // optional - ... - } -}' -``` - -Note that if you omit `spec.engine`, Druid uses the default compaction engine. You can control the default compaction engine by setting `engine` in the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config). If `spec.engine` and `engine` in compaction dynamic config are omitted, Druid defaults to the `native` engine. - -To stop the automatic compaction task, suspend or terminate the supervisor through the UI or API. - -### Use MSQ for auto-compaction - -The MSQ task engine is available as a compaction engine if you configure auto-compaction to use compaction supervisors. To use the MSQ task engine for automatic compaction, make sure the following requirements are met: - -* Enable [incremental segment metadata caching](../configuration/index.md#metadata-retrieval) on the Overlord. -* Enable [Auto-compaction using compaction supervisors](#auto-compaction-using-compaction-supervisors). -* Update the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config) and set `engine` to `msq` to use the MSQ task engine as the default compaction engine for all compaction supervisors. - * Alternatively, you may override the default engine by setting `spec.engine` to `msq` in the compaction supervisor for the relevant datasource. -* Have at least two compaction task slots available or set `spec.taskContext.maxNumTasks` to two or more. The MSQ task engine requires at least two tasks to run, one controller task and one worker task. - -You can use [MSQ task engine context parameters](../multi-stage-query/reference.md#context-parameters) in `spec.taskContext` when configuring your datasource for automatic compaction, such as setting the maximum number of tasks using the `spec.taskContext.maxNumTasks` parameter. Some of the MSQ task engine context parameters overlap with automatic compaction parameters. When these settings overlap, set one or the other. - - -#### MSQ task engine limitations - - - -When using the MSQ task engine for auto-compaction, keep the following limitations in mind: - -- The `metricSpec` field is only supported for certain aggregators. For more information, see [Supported aggregators](#supported-aggregators). -- Only dynamic and range-based partitioning are supported. -- Set `rollup` to `true` if and only if `metricSpec` is not empty or null. -- You can only partition on string dimensions. However, multi-valued string dimensions are not supported. -- The `maxTotalRows` config is not supported in `DynamicPartitionsSpec`. Use `maxRowsPerSegment` instead. -- Segments can only be sorted on `__time` as the first column. - -#### Supported aggregators - -Auto-compaction using the MSQ task engine supports only aggregators that satisfy the following properties: -* __Mergeability__: can combine partial aggregates -* __Idempotency__: produces the same results on repeated runs of the aggregator on previously aggregated values in a column - -This is exemplified by the following `longSum` aggregator: - -``` -{"name": "added", "type": "longSum", "fieldName": "added"} -``` - -where `longSum` being capable of combining partial results satisfies mergeability, while input and output column being the same (`added`) ensures idempotency. - -The following are some examples of aggregators that aren't supported since at least one of the required conditions aren't satisfied: - -* `longSum` aggregator where the `added` column rolls up into `sum_added` column discarding the input `added` column, violating idempotency, as subsequent runs would no longer find the `added` column: - ``` - {"name": "sum_added", "type": "longSum", "fieldName": "added"} - ``` -* Partial sketches which cannot themselves be used to combine partial aggregates and need merging aggregators -- such as `HLLSketchMerge` required for `HLLSketchBuild` aggregator below -- violating mergeability: - ``` - {"name": "added", "type": "HLLSketchBuild", "fieldName": "added"} - ``` -* Count aggregator since it cannot be used to combine partial aggregates and it rolls up into a different `count` column discarding the input column(s), violating both mergeability and idempotency. - ``` - {"type": "count", "name": "count"} - ``` - - - ## Learn more See the following topics for more information: diff --git a/docs/data-management/cascading-reindexing.md b/docs/data-management/cascading-reindexing.md index 5f7b7f37b093..1cc11764ad05 100644 --- a/docs/data-management/cascading-reindexing.md +++ b/docs/data-management/cascading-reindexing.md @@ -40,9 +40,7 @@ Cascading reindexing handles all of this automatically by generating a timeline Before using cascading reindexing, ensure your cluster meets the following requirements: -- **Compaction supervisors enabled**: Set `useSupervisors` to `true` in the [compaction dynamic config](../api-reference/automatic-compaction-api.md#update-cluster-level-compaction-config). - **MSQ compaction engine**: Set `engine` to `msq` in the compaction dynamic config or in the supervisor spec. -- **Incremental segment metadata caching**: Set `druid.manager.segments.useIncrementalCache` to `always` or `ifSynced` in your Overlord and Coordinator runtime properties. See [Segment metadata caching](../configuration/index.md#metadata-retrieval). - **At least two compaction task slots**: The MSQ task engine requires at least two tasks (one controller, one worker). ## How cascading reindexing works diff --git a/docs/data-management/delete.md b/docs/data-management/delete.md index e37ba48b5440..db87e3768d9e 100644 --- a/docs/data-management/delete.md +++ b/docs/data-management/delete.md @@ -127,7 +127,7 @@ Refer to [Data management on the Coordinator](../configuration/index.md#data-man :::info This is an experimental feature that: -- Can be used only if [segment metadata caching](../configuration/index.md#segment-metadata-cache-experimental) is enabled on the Overlord. +- Can be used only if [segment metadata caching](../configuration/index.md#segment-metadata-cache) is enabled on the Overlord. - MUST NOT be used if auto-kill of unused segments is already enabled on the Coordinator. ::: diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 5b99f1be5b02..6f87b7f5c963 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -364,7 +364,7 @@ If the JVM does not support CPU time measurement for the current thread, `ingest ### Segment metadata cache -The following metrics are emitted only when [segment metadata caching](../configuration/index.md#segment-metadata-cache-experimental) is enabled on the Overlord. +The following metrics are emitted only when [segment metadata caching](../configuration/index.md#segment-metadata-cache) is enabled on the Overlord. |Metric|Description|Dimensions| |------|-----------|----------| diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java index d7d12b397dca..63eb0bbaff14 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java @@ -49,6 +49,7 @@ import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; @@ -98,7 +99,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; import java.io.Closeable; import java.util.ArrayList; @@ -188,14 +188,24 @@ public class AutoCompactionTest extends CompactionTestBase ) ); - private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10000; + private static final int MAX_ROWS_PER_SEGMENT_COMPACTED = 10_000; private static final Period NO_SKIP_OFFSET = Period.seconds(0); private static final FixedIntervalOrderPolicy COMPACT_NOTHING_POLICY = new FixedIntervalOrderPolicy(List.of()); + private static final ClusterCompactionConfig DEFAULT_LEGACY_COMPACTION_CONFIG = new ClusterCompactionConfig(0.5, 10, null, false, CompactionEngine.NATIVE, true); public static List getEngine() { return List.of(CompactionEngine.NATIVE); } + + public static List getClusterCompactionConfig() + { + return List.of( + DEFAULT_LEGACY_COMPACTION_CONFIG, + new ClusterCompactionConfig(0.5, 10, null, true, CompactionEngine.NATIVE, true), + new ClusterCompactionConfig(0.5, 10, null, true, CompactionEngine.MSQ, true) + ); + } @Override protected EmbeddedDruidCluster createCluster() @@ -203,14 +213,14 @@ protected EmbeddedDruidCluster createCluster() // Use timeout required for hash partitioning task return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper() .useLatchableEmitter() - .useDefaultTimeoutForLatchableEmitter(30) + .useDefaultTimeoutForLatchableEmitter(300) .addExtension(SketchModule.class) .addExtension(HllSketchModule.class) .addExtension(DoublesSketchModule.class) .addServer(overlord) .addServer(coordinator) .addServer(broker) - .addServer(new EmbeddedIndexer().addProperty("druid.worker.capacity", "10")) + .addServer(new EmbeddedIndexer().addProperty("druid.worker.capacity", "10").setServerMemory(2_000_000_000L)) .addServer(new EmbeddedHistorical()) .addServer(new EmbeddedRouter()); } @@ -226,10 +236,9 @@ public void setupClient() } @BeforeEach - public void resetCompactionTaskSlots() + public void resetClusterCompactionConfig() { - // Set compaction slot to 5 - updateCompactionTaskSlot(0.5, 10); + updateClusterConfig(DEFAULT_LEGACY_COMPACTION_CONFIG); fullDatasourceName = dataSource; } @@ -353,7 +362,7 @@ public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExis } } - @Test() + @Test public void testAutoCompactionOnlyRowsWithoutMetricShouldAddNewMetrics() throws Exception { // added = 31, count = null, sum_added = null @@ -1504,12 +1513,11 @@ public void testAutoCompactionDutyWithDimensionsSpec(CompactionEngine engine) th } } - @ValueSource(booleans = {false}) - @ParameterizedTest(name = "useSupervisors={0}") - public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exception + @MethodSource("getClusterCompactionConfig") + @ParameterizedTest(name = "clusterCompactionConfig={0}") + public void testAutoCompactionDutyWithFilter(ClusterCompactionConfig config) throws Exception { - updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true)); - + updateClusterConfig(config); loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { final List intervalsBeforeCompaction = getSegmentIntervals(); @@ -1533,7 +1541,7 @@ public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exce false, CompactionEngine.NATIVE ); - forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2); + forceTriggerOrWaitForCompaction(intervalsBeforeCompaction, config, 2); // For dim "page", result should only contain value "Striker Eureka" verifyScanResult("added", "459.0"); @@ -1542,17 +1550,17 @@ public void testAutoCompactionDutyWithFilter(boolean useSupervisors) throws Exce List compactTasksBefore = getCompleteTasksForDataSource(fullDatasourceName); // Verify compacted segments does not get compacted again - forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2); + forceTriggerOrWaitForCompaction(intervalsBeforeCompaction, config, 2); List compactTasksAfter = getCompleteTasksForDataSource(fullDatasourceName); Assertions.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); } } - @ValueSource(booleans = {false}) - @ParameterizedTest(name = "useSupervisors={0}") - public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors) throws Exception + @MethodSource("getClusterCompactionConfig") + @ParameterizedTest(name = "clusterCompactionConfig={0}") + public void testAutoCompationDutyWithMetricsSpec(ClusterCompactionConfig config) throws Exception { - updateClusterConfig(new ClusterCompactionConfig(0.5, 10, null, useSupervisors, null, true)); + updateClusterConfig(config); loadData(INDEX_TASK); try (final Closeable ignored = unloader(fullDatasourceName)) { @@ -1576,7 +1584,7 @@ public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors) throws false, CompactionEngine.NATIVE ); - forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2); + forceTriggerOrWaitForCompaction(intervalsBeforeCompaction, config, 2); // Result should be the same with the addition of new metrics, "double_sum_added" and "long_sum_added". // These new metrics should have the same value as the input field "added" @@ -1587,7 +1595,7 @@ public void testAutoCompationDutyWithMetricsSpec(boolean useSupervisors) throws List compactTasksBefore = getCompleteTasksForDataSource(fullDatasourceName); // Verify compacted segments does not get compacted again - forceTriggerAutoCompaction(intervalsBeforeCompaction, useSupervisors, 2); + forceTriggerOrWaitForCompaction(intervalsBeforeCompaction, config, 2); List compactTasksAfter = getCompleteTasksForDataSource(fullDatasourceName); Assertions.assertEquals(compactTasksAfter.size(), compactTasksBefore.size()); } @@ -1775,40 +1783,23 @@ private void submitCompactionConfig( CompactionEngine engine ) { + UserCompactionTaskQueryTuningConfig tuningConfig = + UserCompactionTaskQueryTuningConfig.builder() + .partitionsSpec(partitionsSpec) + .splitHintSpec(new MaxSizeSplitHintSpec(null, 1)) + .maxNumConcurrentSubTasks(1) + .totalNumMergeTasks(1) + .build(); DataSourceCompactionConfig dataSourceCompactionConfig = InlineSchemaDataSourceCompactionConfig.builder() .forDataSource(fullDatasourceName) .withSkipOffsetFromLatest(skipOffsetFromLatest) - .withTuningConfig( - new UserCompactionTaskQueryTuningConfig( - null, - null, - null, - null, - new MaxSizeSplitHintSpec(null, 1), - partitionsSpec, - null, - null, - null, - null, - null, - 1, - null, - null, - null, - null, - null, - 1, - null - ) - ) + .withTuningConfig(tuningConfig) .withGranularitySpec(granularitySpec) .withDimensionsSpec(dimensionsSpec) .withMetricsSpec(metricsSpec) .withTransformSpec(transformSpec) - .withIoConfig( - !dropExisting ? null : new UserCompactionTaskIOConfig(true) - ) + .withIoConfig(!dropExisting ? null : new UserCompactionTaskIOConfig(true)) .withEngine(engine) .withTaskContext(ImmutableMap.of("maxNumTasks", 2)) .build(); @@ -1840,32 +1831,31 @@ private void deleteCompactionConfig() * {@link #fullDatasourceName}, and verifies the total number of segments in * the datasource after compaction. */ - private void forceTriggerAutoCompaction( + private void forceTriggerOrWaitForCompaction( List intervals, - boolean useSupervisors, + ClusterCompactionConfig config, int numExpectedSegmentsAfterCompaction - ) throws Exception + ) { - if (useSupervisors) { + if (config.isUseSupervisors()) { // Enable compaction for the requested intervals final FixedIntervalOrderPolicy policy = new FixedIntervalOrderPolicy( intervals.stream().map( interval -> new FixedIntervalOrderPolicy.Candidate(fullDatasourceName, interval) ).collect(Collectors.toList()) ); - updateClusterConfig( - new ClusterCompactionConfig(0.5, intervals.size(), policy, true, null, true) - ); + updateClusterConfig(config.toBuilder().compactionPolicy(policy).build()); - // Wait for scheduler to pick up the compaction job - // Instead of sleep, we can latch on a relevant metric later - Thread.sleep(30_000); + // Wait for all compaction jobs to be submitted + overlord.latchableEmitter().waitForEvent( + event -> event.hasMetricName("interval/waitCompact/count") + .hasDimension(DruidMetrics.DATASOURCE, dataSource) + .hasValueMatching(Matchers.equalTo(0L)) + ); waitForCompactionToFinish(numExpectedSegmentsAfterCompaction); // Disable all compaction - updateClusterConfig( - new ClusterCompactionConfig(0.5, intervals.size(), COMPACT_NOTHING_POLICY, true, null, true) - ); + updateClusterConfig(config.toBuilder().compactionPolicy(COMPACT_NOTHING_POLICY).build()); } else { forceTriggerAutoCompaction(numExpectedSegmentsAfterCompaction); } @@ -1908,7 +1898,7 @@ private void verifyTombstones(int expectedCompactedTombstoneCount) Assertions.assertEquals(actualTombstoneCount, expectedCompactedTombstoneCount); } - private void verifySegmentsCompacted(PartitionsSpec partitionsSpec, int expectedCompactedSegmentCount) + private void verifySegmentsCompacted(PartitionsSpec expectedPartitionsSpec, int expectedCompactedSegmentCount) { Set segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord); List foundCompactedSegments = new ArrayList<>(); @@ -1921,7 +1911,7 @@ private void verifySegmentsCompacted(PartitionsSpec partitionsSpec, int expected for (DataSegment compactedSegment : foundCompactedSegments) { Assertions.assertNotNull(compactedSegment.getLastCompactionState()); Assertions.assertNotNull(compactedSegment.getLastCompactionState().getPartitionsSpec()); - Assertions.assertEquals(compactedSegment.getLastCompactionState().getPartitionsSpec(), partitionsSpec); + Assertions.assertEquals(expectedPartitionsSpec, compactedSegment.getLastCompactionState().getPartitionsSpec()); } } diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionUpgradeTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionUpgradeTest.java index 708ad28607e2..9aca257f663b 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionUpgradeTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionUpgradeTest.java @@ -22,9 +22,11 @@ import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.metadata.TestDerbyConnector; +import org.apache.druid.server.coordinator.CatalogDataSourceCompactionConfig; import org.apache.druid.server.coordinator.DataSourceCompactionConfig; import org.apache.druid.server.coordinator.DruidCompactionConfig; import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig; @@ -73,8 +75,8 @@ public void test_configCanBeUpdated_afterVersionUpgrades() throws Exception overlord.start(); // Verify that compaction config already exist. This config was inserted manually into the database using SQL script. - DruidCompactionConfig coordinatorCompactionConfig = DruidCompactionConfig.empty() - .withDatasourceConfigs(compactionResource.getAllCompactionConfigs()); + DruidCompactionConfig coordinatorCompactionConfig = + DruidCompactionConfig.empty().withDatasourceConfigs(compactionResource.getAllCompactionConfigs()); DataSourceCompactionConfig foundDataSourceCompactionConfig = coordinatorCompactionConfig.findConfigForDatasource(dataSource).orNull(); Assertions.assertNotNull(foundDataSourceCompactionConfig); @@ -132,18 +134,16 @@ public void test_configCanBeUpdated_afterVersionUpgrades() throws Exception */ private void insertMinimalCompactionConfig(TestDerbyConnector sqlConnector) { - final String configJson = StringUtils.format( - "{\"compactionConfigs\":[{\"dataSource\":\"%s\"}]}", - dataSource - ); - + DataSourceCompactionConfig dataSourceCompactionConfig = + new CatalogDataSourceCompactionConfig(dataSource, null, Period.ZERO, null, null, null, null); + DruidCompactionConfig config = DruidCompactionConfig.legacy().withDatasourceConfig(dataSourceCompactionConfig); sqlConnector.retryWithHandle( handle -> handle.insert( StringUtils.format( "INSERT INTO %s (name, payload) VALUES ('coordinator.compaction.config',?)", sqlConnector.getMetadataTablesConfig().getConfigTable() ), - configJson.getBytes(StandardCharsets.UTF_8) + new DefaultObjectMapper().writeValueAsString(config).getBytes(StandardCharsets.UTF_8) ) ); } diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionResourceTestClient.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionResourceTestClient.java index 0229add690c8..2abd48a52917 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionResourceTestClient.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionResourceTestClient.java @@ -34,6 +34,7 @@ import org.apache.druid.testing.embedded.EmbeddedDruidCluster; import org.apache.druid.testing.embedded.EmbeddedServiceClient; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.junit.jupiter.api.Assertions; import java.util.List; import java.util.Map; @@ -124,6 +125,7 @@ public void forceTriggerAutoCompaction() clusterConfig.getCompactionTaskSlotRatio(), clusterConfig.getMaxCompactionTaskSlots() ); + Assertions.assertFalse(clusterConfig.isUseSupervisors()); final CompactionSimulateResult simulateResult = simulateRunOnCoordinator(); log.info( "Triggering compaction duty on Coordinator. Expected jobs: %s", diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java index c445d52a3838..eae3a492f6ab 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java @@ -275,7 +275,7 @@ public void test_getTotalWorkerCapacity() public void test_isCompactionSupervisorEnabled() { Boolean result = cluster.callApi().onLeaderOverlord(OverlordClient::isCompactionSupervisorEnabled); - Assertions.assertFalse(result); + Assertions.assertTrue(result); } @Test diff --git a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java index d5093d4ffafe..13793def20de 100644 --- a/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java +++ b/extensions-core/druid-catalog/src/test/java/org/apache/druid/catalog/compact/CatalogCompactionTest.java @@ -26,6 +26,7 @@ import org.apache.druid.catalog.model.table.TableBuilder; import org.apache.druid.catalog.sync.CatalogClient; import org.apache.druid.common.utils.IdUtils; +import org.apache.druid.indexer.CompactionEngine; import org.apache.druid.indexing.common.task.IndexTask; import org.apache.druid.indexing.common.task.TaskBuilder; import org.apache.druid.indexing.compact.CompactionSupervisorSpec; @@ -112,7 +113,7 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity() // Create a catalog compaction config CatalogDataSourceCompactionConfig compactionConfig = - new CatalogDataSourceCompactionConfig(dataSource, null, Period.ZERO, null, null, null, null); + new CatalogDataSourceCompactionConfig(dataSource, CompactionEngine.NATIVE, Period.ZERO, null, null, null, null); final CompactionSupervisorSpec compactionSupervisor = new CompactionSupervisorSpec(compactionConfig, false, null); @@ -123,6 +124,7 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity() event -> event.hasMetricName("task/run/time") .hasDimension(DruidMetrics.TASK_TYPE, "compact") .hasDimension(DruidMetrics.DATASOURCE, dataSource) + .hasDimension(DruidMetrics.TASK_STATUS, "SUCCESS") ); // Verify that segments are now compacted to MONTH granularity diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskIdTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskIdTest.java index b3b759a64c20..0a7ee7c49cb0 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskIdTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTaskIdTest.java @@ -28,25 +28,25 @@ public class K8sTaskIdTest @Test public void testModifyingTaskIDToBeK8sCompliant() { - String original = "coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z"; + String original = "auto_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z"; String result = new K8sTaskId(null, original).getK8sJobName(); - Assertions.assertEquals("coordinatorissuedcompactk8smet-2e2c1862cb7ad1d01f4794b27a4438b0", result); + Assertions.assertEquals("autocompactk8smetricsaeifmefd2-4b1aea2eb0593c194d7b4046df1bdd22", result); } @Test public void testModifyingTaskIDWithEmptyK8sTaskPrefixToBeK8sCompliant() { - String original = "coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z"; + String original = "auto_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z"; String result = new K8sTaskId("", original).getK8sJobName(); - Assertions.assertEquals("coordinatorissuedcompactk8smet-2e2c1862cb7ad1d01f4794b27a4438b0", result); + Assertions.assertEquals("autocompactk8smetricsaeifmefd2-4b1aea2eb0593c194d7b4046df1bdd22", result); } @Test public void testModifyingTaskIDWithLongK8sTaskPrefixToBeK8sCompliant() { - String original = "coordinator-issued_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z"; + String original = "auto_compact_k8smetrics_aeifmefd_2022-08-18T15:33:26.094Z"; String result = new K8sTaskId("this-is_a:VERY-very-long-task-name-prefix", original).getK8sJobName(); - Assertions.assertEquals("thisisaveryverylongtasknamepre-2e2c1862cb7ad1d01f4794b27a4438b0", result); + Assertions.assertEquals("thisisaveryverylongtasknamepre-4b1aea2eb0593c194d7b4046df1bdd22", result); } @Test diff --git a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesOverlordUtilsTest.java b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesOverlordUtilsTest.java index b5919efc0920..f449fb31ca12 100644 --- a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesOverlordUtilsTest.java +++ b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/KubernetesOverlordUtilsTest.java @@ -71,8 +71,8 @@ public void test_stripJobName() @Test public void test_stripJobName_avoidDuplicatesWithLongDataSourceName() { - String jobName1 = KubernetesOverlordUtils.convertTaskIdToJobName("coordinator-issued_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_116_pcgkebcl_2023-07-19T16:53:11.416Z"); - String jobName2 = KubernetesOverlordUtils.convertTaskIdToJobName("coordinator-issued_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_117_pcgkebcl_2023-07-19T16:53:11.416Z"); + String jobName1 = KubernetesOverlordUtils.convertTaskIdToJobName("auto_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_116_pcgkebcl_2023-07-19T16:53:11.416Z"); + String jobName2 = KubernetesOverlordUtils.convertTaskIdToJobName("auto_compact_1234_telemetry_wikipedia_geteditfailuresinnorthamerica_agg_summ_117_pcgkebcl_2023-07-19T16:53:11.416Z"); Assertions.assertNotEquals(jobName1, jobName2); } diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java index ffbc7b992e2e..8fd23da5e15e 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManagerConfig.java @@ -51,7 +51,7 @@ public SegmentsMetadataManagerConfig( ) { this.pollDuration = Configs.valueOrDefault(pollDuration, Period.minutes(1)); - this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, SegmentMetadataCache.UsageMode.NEVER); + this.useIncrementalCache = Configs.valueOrDefault(useIncrementalCache, SegmentMetadataCache.UsageMode.IF_SYNCED); this.killUnused = Configs.valueOrDefault(killUnused, new UnusedSegmentKillerConfig(null, null, null)); if (this.killUnused.isEnabled() && this.useIncrementalCache == SegmentMetadataCache.UsageMode.NEVER) { throw DruidException diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java index 08e543504a6a..5de14f594f48 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/ClusterCompactionConfig.java @@ -61,7 +61,7 @@ public ClusterCompactionConfig( this.maxCompactionTaskSlots = Configs.valueOrDefault(maxCompactionTaskSlots, Integer.MAX_VALUE); this.compactionPolicy = Configs.valueOrDefault(compactionPolicy, DEFAULT_COMPACTION_POLICY); this.engine = Configs.valueOrDefault(engine, CompactionEngine.NATIVE); - this.useSupervisors = Configs.valueOrDefault(useSupervisors, false); + this.useSupervisors = Configs.valueOrDefault(useSupervisors, true); this.storeCompactionStatePerSegment = Configs.valueOrDefault( storeCompactionStatePerSegment, true @@ -158,4 +158,76 @@ public String toString() ", storeCompactionStatePerSegment=" + storeCompactionStatePerSegment + '}'; } + + public Builder toBuilder() + { + return new Builder().compactionTaskSlotRatio(compactionTaskSlotRatio) + .maxCompactionTaskSlots(maxCompactionTaskSlots) + .compactionPolicy(compactionPolicy) + .useSupervisors(useSupervisors) + .storeCompactionStatePerSegment(storeCompactionStatePerSegment); + } + + public static Builder builder() + { + return new Builder(); + } + + public static class Builder + { + private Double compactionTaskSlotRatio; + private Integer maxCompactionTaskSlots; + private CompactionCandidateSearchPolicy compactionPolicy; + private Boolean useSupervisors; + private CompactionEngine engine; + private Boolean storeCompactionStatePerSegment; + + public Builder compactionTaskSlotRatio(double compactionTaskSlotRatio) + { + this.compactionTaskSlotRatio = compactionTaskSlotRatio; + return this; + } + + public Builder maxCompactionTaskSlots(int maxCompactionTaskSlots) + { + this.maxCompactionTaskSlots = maxCompactionTaskSlots; + return this; + } + + public Builder compactionPolicy(CompactionCandidateSearchPolicy compactionPolicy) + { + this.compactionPolicy = compactionPolicy; + return this; + } + + public Builder useSupervisors(boolean useSupervisors) + { + this.useSupervisors = useSupervisors; + return this; + } + + public Builder engine(CompactionEngine engine) + { + this.engine = engine; + return this; + } + + public Builder storeCompactionStatePerSegment(boolean storeCompactionStatePerSegment) + { + this.storeCompactionStatePerSegment = storeCompactionStatePerSegment; + return this; + } + + public ClusterCompactionConfig build() + { + return new ClusterCompactionConfig( + compactionTaskSlotRatio, + maxCompactionTaskSlots, + compactionPolicy, + useSupervisors, + engine, + storeCompactionStatePerSegment + ); + } + } } diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java index 3af188d9a03a..54ca14ae260c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCompactionConfig.java @@ -42,6 +42,9 @@ public class DruidCompactionConfig private static final DruidCompactionConfig EMPTY_INSTANCE = new DruidCompactionConfig(List.of(), null, null, null, null, null, null); + private static final DruidCompactionConfig LEGACY_RUN_AS_COORDINATOR_DUTY = + new DruidCompactionConfig(List.of(), null, null, null, false, CompactionEngine.NATIVE, null); + private final List compactionConfigs; private final ClusterCompactionConfig clusterConfig; @@ -79,6 +82,11 @@ public static DruidCompactionConfig empty() return EMPTY_INSTANCE; } + public static DruidCompactionConfig legacy() + { + return LEGACY_RUN_AS_COORDINATOR_DUTY; + } + @JsonCreator public DruidCompactionConfig( @JsonProperty("compactionConfigs") List compactionConfigs, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java index e60c512e6a5b..cbb72762651c 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/duty/CompactSegments.java @@ -94,7 +94,7 @@ public class CompactSegments implements CoordinatorCustomDuty private static final Logger LOG = new Logger(CompactSegments.class); - private static final String TASK_ID_PREFIX = "coordinator-issued"; + private static final String TASK_ID_PREFIX = "auto"; private final CompactionStatusTracker statusTracker; private final OverlordClient overlordClient; diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ClusterCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ClusterCompactionConfigTest.java new file mode 100644 index 000000000000..74dcd21f036e --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/ClusterCompactionConfigTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.apache.druid.indexer.CompactionEngine; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.server.compaction.NewestSegmentFirstPolicy; +import org.junit.Assert; +import org.junit.Test; + +public class ClusterCompactionConfigTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + + @Test + public void testDefaults() + { + ClusterCompactionConfig config = new ClusterCompactionConfig(null, null, null, null, null, null); + + Assert.assertEquals(0.1, config.getCompactionTaskSlotRatio(), 0.0001); + Assert.assertEquals(Integer.MAX_VALUE, config.getMaxCompactionTaskSlots()); + Assert.assertTrue(config.isUseSupervisors()); + Assert.assertEquals(CompactionEngine.NATIVE, config.getEngine()); + Assert.assertNotNull(config.getCompactionPolicy()); + Assert.assertTrue(config.isStoreCompactionStatePerSegment()); + } + + @Test + public void testSerde() throws Exception + { + ClusterCompactionConfig config = ClusterCompactionConfig.builder() + .compactionTaskSlotRatio(0.5) + .maxCompactionTaskSlots(10) + .useSupervisors(false) + .engine(CompactionEngine.NATIVE) + .storeCompactionStatePerSegment(false) + .build(); + + String json = MAPPER.writeValueAsString(config); + ClusterCompactionConfig deserialized = MAPPER.readValue(json, ClusterCompactionConfig.class); + + Assert.assertEquals(config, deserialized); + } + + @Test + public void testBuilder() + { + NewestSegmentFirstPolicy policy = new NewestSegmentFirstPolicy(null); + ClusterCompactionConfig config = ClusterCompactionConfig.builder() + .compactionTaskSlotRatio(0.3) + .maxCompactionTaskSlots(5) + .useSupervisors(true) + .engine(CompactionEngine.MSQ) + .compactionPolicy(policy) + .storeCompactionStatePerSegment(true) + .build(); + + Assert.assertEquals(0.3, config.getCompactionTaskSlotRatio(), 0.0001); + Assert.assertEquals(5, config.getMaxCompactionTaskSlots()); + Assert.assertTrue(config.isUseSupervisors()); + Assert.assertEquals(CompactionEngine.MSQ, config.getEngine()); + Assert.assertEquals(policy, config.getCompactionPolicy()); + Assert.assertTrue(config.isStoreCompactionStatePerSegment()); + } + + @Test + public void testToBuilder() + { + ClusterCompactionConfig original = ClusterCompactionConfig.builder() + .compactionTaskSlotRatio(0.7) + .maxCompactionTaskSlots(20) + .useSupervisors(false) + .engine(CompactionEngine.NATIVE) + .storeCompactionStatePerSegment(false) + .build(); + + ClusterCompactionConfig modified = original.toBuilder() + .compactionTaskSlotRatio(0.8) + .build(); + + Assert.assertEquals(0.8, modified.getCompactionTaskSlotRatio(), 0.0001); + Assert.assertEquals(20, modified.getMaxCompactionTaskSlots()); + Assert.assertFalse(modified.isUseSupervisors()); + } + + @Test + public void testMsqEngineRequiresSupervisors() + { + DruidException e = Assert.assertThrows( + DruidException.class, + () -> ClusterCompactionConfig.builder().useSupervisors(false).engine(CompactionEngine.MSQ).build() + ); + Assert.assertEquals("MSQ Compaction engine can be used only with compaction supervisors.", e.getMessage()); + Assert.assertEquals(DruidException.Category.INVALID_INPUT, e.getCategory()); + } + + @Test + public void testEqualsAndHashCode() + { + ClusterCompactionConfig config1 = ClusterCompactionConfig.builder() + .compactionTaskSlotRatio(0.5) + .maxCompactionTaskSlots(10) + .build(); + + ClusterCompactionConfig config2 = ClusterCompactionConfig.builder() + .compactionTaskSlotRatio(0.5) + .maxCompactionTaskSlots(10) + .build(); + + ClusterCompactionConfig config3 = ClusterCompactionConfig.builder() + .compactionTaskSlotRatio(0.6) + .maxCompactionTaskSlots(10) + .build(); + + Assert.assertEquals(config1, config2); + Assert.assertEquals(config1.hashCode(), config2.hashCode()); + Assert.assertNotEquals(config1, config3); + Assert.assertNotEquals(config3, config2); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java index 8fae5623db41..295201116422 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCompactionConfigTest.java @@ -45,6 +45,13 @@ public void testSerdeDefaultConfig() throws Exception Assert.assertEquals(defaultConfig, deserialized); } + @Test + public void testSerdeWithLegacyConfig() throws Exception + { + final String json = "{\"compactionConfigs\":[],\"useSupervisors\":false,\"engine\":\"native\"}"; + Assert.assertEquals(DruidCompactionConfig.legacy(), MAPPER.readValue(json, DruidCompactionConfig.class)); + } + @Test public void testSerdeWithDatasourceConfigs() throws Exception { @@ -118,6 +125,7 @@ public void testDefaultConfigValues() Assert.assertTrue(config.getCompactionConfigs().isEmpty()); Assert.assertTrue(config.getCompactionPolicy() instanceof NewestSegmentFirstPolicy); Assert.assertEquals(CompactionEngine.NATIVE, config.getEngine()); + Assert.assertTrue(config.isUseSupervisors()); Assert.assertEquals(0.1, config.getCompactionTaskSlotRatio(), 1e-9); Assert.assertEquals(Integer.MAX_VALUE, config.getMaxCompactionTaskSlots()); Assert.assertTrue(config.isStoreCompactionStatePerSegment()); diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java index 75aefc822891..e96d800bec87 100644 --- a/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorCompactionConfigsResourceTest.java @@ -105,7 +105,7 @@ public void testGetDefaultClusterConfig() Assert.assertEquals(0.1, defaultConfig.getCompactionTaskSlotRatio(), DELTA); Assert.assertEquals(Integer.MAX_VALUE, defaultConfig.getMaxCompactionTaskSlots()); Assert.assertTrue(defaultConfig.getCompactionConfigs().isEmpty()); - Assert.assertFalse(defaultConfig.isUseSupervisors()); + Assert.assertTrue(defaultConfig.isUseSupervisors()); Assert.assertEquals(CompactionEngine.NATIVE, defaultConfig.getEngine()); }