diff --git a/clients/client-python/gravitino/api/policy/__init__.py b/clients/client-python/gravitino/api/policy/__init__.py new file mode 100644 index 00000000000..ee151351538 --- /dev/null +++ b/clients/client-python/gravitino/api/policy/__init__.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from gravitino.api.policy.iceberg_data_compaction_content import ( + IcebergDataCompactionContent, +) +from gravitino.api.policy.policy import Policy +from gravitino.api.policy.policy_change import ( + PolicyChange, + RenamePolicy, + UpdateContent, + UpdatePolicyComment, +) +from gravitino.api.policy.policy_content import PolicyContent +from gravitino.api.policy.policy_contents import PolicyContents + +__all__ = [ + "Policy", + "PolicyContent", + "PolicyChange", + "RenamePolicy", + "UpdatePolicyComment", + "UpdateContent", + "PolicyContents", + "IcebergDataCompactionContent", +] diff --git a/clients/client-python/gravitino/api/policy/iceberg_data_compaction_content.py b/clients/client-python/gravitino/api/policy/iceberg_data_compaction_content.py new file mode 100644 index 00000000000..0fa5c7afbd6 --- /dev/null +++ b/clients/client-python/gravitino/api/policy/iceberg_data_compaction_content.py @@ -0,0 +1,308 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import re +from types import MappingProxyType +from typing import Any, Final, Mapping + +from gravitino.api.metadata_object import MetadataObject +from gravitino.api.policy.policy_content import PolicyContent +from gravitino.utils import StringUtils +from gravitino.utils.precondition import Precondition + + +class IcebergDataCompactionContent(PolicyContent): + """ + Built-in policy content for Iceberg compaction strategy. + """ + + # Property key for strategy type. + STRATEGY_TYPE_KEY: Final[str] = "strategy.type" + + # Strategy type value for iceberg data compaction. + STRATEGY_TYPE_VALUE: Final[str] = "iceberg-data-compaction" + + # Property key for job template name. + JOB_TEMPLATE_NAME_KEY: Final[str] = "job.template-name" + + # Built-in job template name for Iceberg rewrite data files. + JOB_TEMPLATE_NAME_VALUE: Final[str] = "builtin-iceberg-rewrite-data-files" + + # Prefix for rewrite options propagated to job options. + JOB_OPTIONS_PREFIX: Final[str] = "job.options." + + # Rule key for trigger expression. + TRIGGER_EXPR_KEY: Final[str] = "trigger-expr" + + # Rule key for score expression. + SCORE_EXPR_KEY: Final[str] = "score-expr" + + # Rule key for minimum data file MSE threshold. + MIN_DATA_FILE_MSE_KEY: Final[str] = "minDataFileMse" + + # Rule key for minimum delete file count threshold. + MIN_DELETE_FILE_NUMBER_KEY: Final[str] = "minDeleteFileNumber" + + # Rule key for data file MSE score weight. + DATA_FILE_MSE_WEIGHT_KEY: Final[str] = "dataFileMseWeight" + + # Rule key for delete file number score weight. + DELETE_FILE_NUMBER_WEIGHT_KEY: Final[str] = "deleteFileNumberWeight" + + # Rule key for max partition number selected for compaction. + MAX_PARTITION_NUM_KEY: Final[str] = "max-partition-num" + + # Metric name for data file MSE. + DATA_FILE_MSE_METRIC: Final[str] = "custom-data-file-mse" + + # Metric name for delete file number. + DELETE_FILE_NUMBER_METRIC: Final[str] = "custom-delete-file-number" + + # Default minimum threshold for data file MSE metric, equals (128 MiB * 0.15)^2. + DEFAULT_MIN_DATA_FILE_MSE: Final[int] = 405323966463344 + + # Default minimum threshold for delete file number metric. + DEFAULT_MIN_DELETE_FILE_NUMBER: Final[int] = 1 + + # Default score weight for data file MSE. + DEFAULT_DATA_FILE_MSE_WEIGHT: Final[int] = 1 + + # Default score weight for delete file number. + DEFAULT_DELETE_FILE_NUMBER_WEIGHT: Final[int] = 100 + + # Default max partition number for compaction. + DEFAULT_MAX_PARTITION_NUM: Final[int] = 50 + + # Default rewrite options for Iceberg rewrite data files. + DEFAULT_REWRITE_OPTIONS: Mapping[str, str] = MappingProxyType({}) + + OPTION_KEY_PATTERN: Final[re.Pattern[str]] = re.compile(r"[A-Za-z0-9._-]+") + + SUPPORTED_OBJECT_TYPES: set[MetadataObject.Type] = { + MetadataObject.Type.CATALOG, + MetadataObject.Type.SCHEMA, + MetadataObject.Type.TABLE, + } + + TRIGGER_EXPR: Final[str] = ( + f"{DATA_FILE_MSE_METRIC} >= {MIN_DATA_FILE_MSE_KEY} " + f"|| {DELETE_FILE_NUMBER_METRIC} >= {MIN_DELETE_FILE_NUMBER_KEY}" + ) + + SCORE_EXPR: Final[str] = ( + f"{DATA_FILE_MSE_METRIC} * {DATA_FILE_MSE_WEIGHT_KEY} " + f"+ {DELETE_FILE_NUMBER_METRIC} * {DELETE_FILE_NUMBER_WEIGHT_KEY}" + ) + + def __init__( + self, + min_data_file_mse: int, + min_delete_file_number: int, + data_file_mse_weight: int, + delete_file_number_weight: int, + max_partition_num: int, + rewrite_options: dict[str, str], + ) -> None: + super().__init__() + + self._min_data_file_mse = ( + min_data_file_mse + if min_data_file_mse is not None + else IcebergDataCompactionContent.DEFAULT_MIN_DATA_FILE_MSE + ) + self._min_delete_file_number = ( + min_delete_file_number + if min_delete_file_number is not None + else IcebergDataCompactionContent.DEFAULT_MIN_DELETE_FILE_NUMBER + ) + self._data_file_mse_weight = ( + data_file_mse_weight + if data_file_mse_weight is not None + else IcebergDataCompactionContent.DEFAULT_DATA_FILE_MSE_WEIGHT + ) + self._delete_file_number_weight = ( + delete_file_number_weight + if delete_file_number_weight is not None + else IcebergDataCompactionContent.DEFAULT_DELETE_FILE_NUMBER_WEIGHT + ) + self._max_partition_num = ( + max_partition_num + if max_partition_num is not None + else IcebergDataCompactionContent.DEFAULT_MAX_PARTITION_NUM + ) + self._rewrite_options = ( + MappingProxyType(rewrite_options) + if rewrite_options is not None + else IcebergDataCompactionContent.DEFAULT_REWRITE_OPTIONS + ) + + def __eq__(self, value) -> bool: + if not isinstance(value, IcebergDataCompactionContent): + return False + + return ( + self.min_data_file_mse == value.min_data_file_mse + and self.min_delete_file_number == value.min_delete_file_number + and self.data_file_mse_weight == value.data_file_mse_weight + and self.delete_file_number_weight == value.delete_file_number_weight + and self.max_partition_num == value.max_partition_num + and self.rewrite_options == value.rewrite_options + ) + + def __hash__(self): + return hash( + self._min_data_file_mse, + self._min_delete_file_number, + self._data_file_mse_weight, + self._delete_file_number_weight, + self._max_partition_num, + tuple(sorted(self._rewrite_options.items())), + ) + + def __str__(self) -> str: + return ( + "IcebergDataCompactionContent(" + f"min_data_file_mse={self.min_data_file_mse}, " + f"min_delete_file_number={self.min_delete_file_number}, " + f"data_file_mse_weight={self.data_file_mse_weight}, " + f"delete_file_number_weight={self.delete_file_number_weight}, " + f"max_partition_num={self.max_partition_num}, " + f"rewrite_options={dict(self.rewrite_options)}" + ")" + ) + + @property + def min_data_file_mse(self) -> int: + """ + Returns the minimum threshold for "custom-data-file-mse". + + Returns: + int: minimum data file MSE threshold + """ + return self._min_data_file_mse + + @property + def min_delete_file_number(self) -> int: + """ + Returns the minimum threshold for "custom-delete-file-number". + + Returns: + int: minimum delete file number threshold + """ + return self._min_delete_file_number + + @property + def data_file_mse_weight(self) -> int: + """ + Returns the weight used by "custom-data-file-mse" in score expression. + + Returns: + int: data file MSE score weight + """ + return self._data_file_mse_weight + + @property + def delete_file_number_weight(self) -> int: + """ + Returns the weight used by "custom-delete-file-number" in score expression. + + Returns: + int: delete file number score weight + """ + return self._delete_file_number_weight + + @property + def max_partition_num(self) -> int: + """ + Returns max partition number selected for compaction. + + Returns: + int: max partition number + """ + return self._max_partition_num + + @property + def rewrite_options(self) -> Mapping[str, str]: + """ + Returns rewrite options that are expanded to job.options.* rule entries. + + Returns: + Mapping[str, str]: The rewrite options + """ + return self._rewrite_options + + def supported_object_types(self) -> set[MetadataObject.Type]: + return IcebergDataCompactionContent.SUPPORTED_OBJECT_TYPES + + def properties(self) -> dict[str, str]: + # properties keep stable strategy/job identity; thresholds and scoring knobs belong to rules. + return { + IcebergDataCompactionContent.STRATEGY_TYPE_KEY: IcebergDataCompactionContent.STRATEGY_TYPE_VALUE, + IcebergDataCompactionContent.JOB_TEMPLATE_NAME_KEY: IcebergDataCompactionContent.JOB_TEMPLATE_NAME_VALUE, + } + + def rules(self) -> dict[str, Any]: + rewrite_rules: dict[str, Any] = dict(self._rewrite_options.items()) + + return { + IcebergDataCompactionContent.MIN_DATA_FILE_MSE_KEY: self.min_data_file_mse, + IcebergDataCompactionContent.MIN_DELETE_FILE_NUMBER_KEY: self.min_delete_file_number, + IcebergDataCompactionContent.DATA_FILE_MSE_WEIGHT_KEY: self.data_file_mse_weight, + IcebergDataCompactionContent.DELETE_FILE_NUMBER_WEIGHT_KEY: self.delete_file_number_weight, + IcebergDataCompactionContent.MAX_PARTITION_NUM_KEY: self.max_partition_num, + IcebergDataCompactionContent.TRIGGER_EXPR_KEY: self.TRIGGER_EXPR, + IcebergDataCompactionContent.SCORE_EXPR_KEY: self.SCORE_EXPR, + **rewrite_rules, + } + + def validate(self) -> None: + Precondition.check_argument( + self.min_data_file_mse >= 0, + "minDataFileMse must be >= 0", + ) + Precondition.check_argument( + self.min_delete_file_number >= 0, + "minDeleteFileNumber must be >= 0", + ) + Precondition.check_argument( + self.data_file_mse_weight >= 0, + "dataFileMseWeight must be >= 0", + ) + Precondition.check_argument( + self._max_partition_num > 0, + "maxPartitionNum must be > 0", + ) + + for k, v in self._rewrite_options.items(): + Precondition.check_argument( + StringUtils.is_not_blank(k), + "rewrite option key is blank", + ) + Precondition.check_argument( + IcebergDataCompactionContent.OPTION_KEY_PATTERN.fullmatch(k), + f"rewrite option key '{k}' contains illegal characters", + ) + Precondition.check_argument( + not k.startswith(IcebergDataCompactionContent.JOB_OPTIONS_PREFIX), + f"rewrite option key '{k}' must not start with {IcebergDataCompactionContent.JOB_OPTIONS_PREFIX}", + ) + Precondition.check_argument( + StringUtils.is_not_blank(v), + f"rewrite option '{k}' must have non-empty value", + ) diff --git a/clients/client-python/gravitino/api/policy/policy.py b/clients/client-python/gravitino/api/policy/policy.py new file mode 100644 index 00000000000..c6c4be28dab --- /dev/null +++ b/clients/client-python/gravitino/api/policy/policy.py @@ -0,0 +1,154 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from __future__ import annotations + +import typing as tp +from abc import ABC, abstractmethod + +from gravitino.api.auditable import Auditable +from gravitino.api.metadata_object import MetadataObject +from gravitino.api.policy.policy_content import PolicyContent +from gravitino.exceptions.base import UnsupportedOperationException + + +class Policy(Auditable, ABC): + """ + The interface of the policy. The policy is a set of rules that + can be associated with a metadata object. The policy can be used + for data governance and so on. + """ + + # The prefix for built-in policy types. All built-in policy types should start with this prefix. + BUILT_IN_TYPE_PREFIX = "system_" + + @abstractmethod + def name(self) -> str: + """ + Get the name of the policy. + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + str: The name of the policy. + """ + raise NotImplementedError() + + @abstractmethod + def policy_type(self) -> str: + """ + Get the type of the policy. + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + str: The type of the policy. + """ + raise NotImplementedError() + + @abstractmethod + def comment(self) -> str: + """ + Get the comment of the policy. + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + str: The comment of the policy. + """ + raise NotImplementedError() + + @abstractmethod + def enabled(self) -> bool: + """ + Get whether the policy is enabled or not. + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + bool: True if the policy is enabled, false otherwise. + """ + raise NotImplementedError() + + @abstractmethod + def content(self) -> PolicyContent: + """ + Get the content of the policy. + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + PolicyContent: The content of the policy. + """ + raise NotImplementedError() + + @abstractmethod + def inherited(self) -> tp.Optional[bool]: + """ + Check if the policy is inherited from a parent object or not. + Note: The return value is optional, Only when the policy is associated + with a metadata object, and called from the metadata object, + the return value will be present. Otherwise, it will be empty. + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + Optional[bool]: True if the policy is inherited, + false if it is owned by the object itself. + Empty if the policy is not associated with any object. + """ + raise NotImplementedError() + + @abstractmethod + def associated_objects(self) -> AssociatedObjects: + raise UnsupportedOperationException( + "The associatedObjects method is not supported." + ) + + +class AssociatedObjects(ABC): + """ + The interface of the associated objects of the policy. + """ + + @abstractmethod + def count(self) -> int: + """ + _summary_ + + Returns: + int: _description_ + """ + objects = self.objects() + return len(objects) if objects is not None else 0 + + @abstractmethod + def objects(self) -> list[MetadataObject]: + """ + Get the list of metadata objects that are associated with this policy. + + Returns: + list[MetadataObject]: The list of objects that are associated with this policy. + """ + pass diff --git a/clients/client-python/gravitino/api/policy/policy_change.py b/clients/client-python/gravitino/api/policy/policy_change.py new file mode 100644 index 00000000000..36923f27d4a --- /dev/null +++ b/clients/client-python/gravitino/api/policy/policy_change.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import typing as tp +from dataclasses import dataclass, field + +from dataclasses_json import config + +from gravitino.api.policy.policy_content import PolicyContent + + +class PolicyChange: + """ + Interface for supporting policy changes. + This interface will be used to provide policy modification operations for each policy. + """ + + @staticmethod + def rename(new_name: str) -> PolicyChange: + """ + Creates a new policy change to rename the policy. + + Args: + new_name (str): The new name for the policy. + + Returns: + PolicyChange: The policy change. + """ + return RenamePolicy(new_name) + + @staticmethod + def update_comment(new_comment: str) -> PolicyChange: + """ + Creates a new policy change to update the policy comment. + + Args: + new_comment (str): The new comment for the policy. + + Returns: + PolicyChange: The policy change. + """ + return UpdatePolicyComment(new_comment) + + @staticmethod + def update_content(policy_type: str, new_content: PolicyContent) -> PolicyChange: + """ + Creates a new policy change to update the content of the policy. + + Args: + policy_type (str): The type of the policy, used for validation. + new_content (PolicyContent): The new content for the policy. + + Returns: + UpdateContent: The policy change. + """ + return UpdateContent(policy_type, new_content) + + +@tp.final +@dataclass(frozen=True) +class RenamePolicy(PolicyChange): + _new_name: str = field(metadata=config(field_name="newName")) + + def __str__(self) -> str: + return f"RENAME POLICY {self._new_name}" + + @property + def new_name(self) -> str: + return self._new_name + + +@tp.final +@dataclass(frozen=True) +class UpdatePolicyComment(PolicyChange): + _new_comment: str = field(metadata=config(field_name="newComment")) + + def __str__(self) -> str: + return f"UPDATE POLICY COMMENT {self._new_comment}" + + @property + def new_comment(self) -> str: + return self._new_comment + + +@tp.final +@dataclass(frozen=True) +class UpdateContent(PolicyChange): + _policy_type: str = field(metadata=config(field_name="policyType")) + _new_content: PolicyContent = field(metadata=config(field_name="newContent")) + + def __str__(self) -> str: + return ( + "UPDATE POLICY CONTENT " + + "policyType=" + + f"{self._policy_type}" + + ", content=" + + f"{self._new_content}" + ) + + @property + def new_content(self) -> PolicyContent: + return self._new_content + + @property + def policy_type(self) -> str: + return self._policy_type diff --git a/clients/client-python/gravitino/api/policy/policy_content.py b/clients/client-python/gravitino/api/policy/policy_content.py new file mode 100644 index 00000000000..b48bd7c5142 --- /dev/null +++ b/clients/client-python/gravitino/api/policy/policy_content.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import typing as tp +from abc import ABC, abstractmethod + +from gravitino.api.metadata_object import MetadataObject +from gravitino.exceptions.base import UnsupportedOperationException +from gravitino.utils.precondition import Precondition + + +class PolicyContent(ABC): + """ + The interface of the content of the policy. + """ + + @abstractmethod + def supported_object_types(self) -> set[MetadataObject.Type]: + """ + Return the set of metadata object types that the policy can be applied to + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + set[MetadataObject.Type]: the set of metadata object types that the policy can be applied to + """ + raise NotImplementedError() + + @abstractmethod + def properties(self) -> dict[str, str]: + """ + Return the additional properties of the policy. + + Raises: + NotADirectoryError: If the method is not implemented. + + Returns: + dict[str, str]: The additional properties of the policy. + """ + raise NotADirectoryError() + + def rules(self) -> dict[str, tp.Any]: + """ + A convenience method to get all rules in the policy content. + + Raises: + UnsupportedOperationException: If it doesn't support get all rules. + + Returns: + dict[str, tp.Any]: A map of rule names to their corresponding rule objects. + """ + raise UnsupportedOperationException("Does support get all rules.") + + def validate(self) -> None: + """ + Validates the policy content. + """ + support_types = self.supported_object_types() + Precondition.check_argument( + support_types is not None and len(support_types) > 0, + "The supported object types of the policy cannot be empty.", + ) diff --git a/clients/client-python/gravitino/api/policy/policy_contents.py b/clients/client-python/gravitino/api/policy/policy_contents.py new file mode 100644 index 00000000000..db869ca52f7 --- /dev/null +++ b/clients/client-python/gravitino/api/policy/policy_contents.py @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import typing as tp + +from gravitino.api.metadata_object import MetadataObject +from gravitino.api.policy.iceberg_data_compaction_content import ( + IcebergDataCompactionContent, +) +from gravitino.api.policy.policy_content import PolicyContent + + +class PolicyContents: + """ + Utility class for creating instances of PolicyContent. + """ + + @staticmethod + def custom( + rules: dict[str, tp.Any], + supported_object_types: set[MetadataObject.Type], + properties: dict[str, str], + ) -> PolicyContent: + """ + Creates a custom policy content with the given rules and properties. + + Args: + rules (dict[str, tp.Any]): The custom rules of the policy. + supported_object_types (set[MetadataObject.Type]): The set of metadata object types + that the policy can be applied to. + properties (dict[str, str]): The additional properties of the policy. + + Returns: + PolicyContent: A new instance of PolicyContent with the specified rules and properties. + """ + return CustomContent(rules, supported_object_types, properties) + + @staticmethod + def iceberg_data_compaction( + min_data_file_mse: int = IcebergDataCompactionContent.DEFAULT_MIN_DATA_FILE_MSE, + min_delete_file_number: int = IcebergDataCompactionContent.DEFAULT_MIN_DELETE_FILE_NUMBER, + data_file_mse_weight=IcebergDataCompactionContent.DEFAULT_DATA_FILE_MSE_WEIGHT, + delete_file_number_weight=IcebergDataCompactionContent.DEFAULT_DELETE_FILE_NUMBER_WEIGHT, + max_partition_num=IcebergDataCompactionContent.DEFAULT_MAX_PARTITION_NUM, + rewrite_options: tp.Mapping[str, str] | None = None, + ) -> IcebergDataCompactionContent: + if rewrite_options is None: + rewrite_options = dict(IcebergDataCompactionContent.DEFAULT_REWRITE_OPTIONS) + + return IcebergDataCompactionContent( + min_data_file_mse, + min_delete_file_number, + data_file_mse_weight, + delete_file_number_weight, + max_partition_num, + rewrite_options, + ) + + +class CustomContent(PolicyContent): + """ + A custom content implementation of PolicyContent that holds custom rules and properties. + """ + + def __init__( + self, + custom_rules: dict[str, tp.Any], + supported_object_types: set[MetadataObject.Type], + properties: dict[str, str], + ) -> None: + super().__init__() + self._custom_rules = custom_rules + self._supported_object_types = set(supported_object_types) + self._properties = properties + + def __hash__(self): + return hash( + self._custom_rules, + self._properties, + self._supported_object_types, + ) + + def __eq__(self, other) -> bool: + if not isinstance(other, CustomContent): + return False + return ( + self._custom_rules == other._custom_rules + and self._properties == other._properties + and self._supported_object_types == other._supported_object_types + ) + + def __str__(self) -> str: + return ( + "CustomContent{" + + f"customRules={self._custom_rules}" + + f", properties={self._properties}" + + f", supportedObjectTypes={self._supported_object_types}" + + "}" + ) + + @property + def custom_rules(self) -> dict[str, tp.Any]: + """ + Get the custom rules of the policy content. + + Returns: + dict[str, tp.Any]: The custom rules of the policy content. + """ + return self._custom_rules + + def supported_object_types(self) -> set[MetadataObject.Type]: + return self._supported_object_types + + def properties(self) -> dict[str, str]: + return self._properties + + def rules(self) -> dict[str, tp.Any]: + return self._custom_rules diff --git a/clients/client-python/gravitino/api/policy/policy_operations.py b/clients/client-python/gravitino/api/policy/policy_operations.py new file mode 100644 index 00000000000..3258bcc48d2 --- /dev/null +++ b/clients/client-python/gravitino/api/policy/policy_operations.py @@ -0,0 +1,140 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from abc import ABC, abstractmethod + +from gravitino.api.policy import Policy, PolicyContent + + +class PolicyOperations(ABC): + """ + The interface of the policy operations. The policy operations + are used to manage policies under a metalake. This interface will be + mixed with GravitinoMetalake or GravitinoClient to provide policy operations. + """ + + @abstractmethod + async def list_policies(self) -> list[str]: + """ + List all the policy names under a metalake. + + Raises: + NotImplementedError: If the method is not implemented. + NoSuchMetalakeException: If the metalake does not exist. + + Returns: + list[str]: The list of policy names under the metalake. + """ + raise NotImplementedError() + + @abstractmethod + async def list_policy_infos(self) -> list[Policy]: + """ + List all the policies with detailed information under a metalake. + + Raises: + NotImplementedError: If the method is not implemented. + NoSuchMetalakeException: If the metalake does not exist. + + Returns: + list[Policy]: The list of policies with details under the metalake. + """ + raise NotImplementedError() + + @abstractmethod + async def get_policy(self, policy_name: str) -> Policy: + """ + Get a policy by its name under a metalake. + + Args: + policy_name (str): The name of the policy. + + Raises: + NotImplementedError: If the method is not implemented. + NoSuchMetalakeException: If the metalake does not exist. + + Returns: + Policy: The policy instance. + """ + raise NotImplementedError() + + @abstractmethod + async def create_policy( + self, + name: str, + policy_type: str, + comment: str, + enabled: bool, + content: PolicyContent, + ) -> Policy: + raise NotImplementedError() + + @abstractmethod + async def enable_policy(self, policy_name: str) -> None: + """ + Enable a policy under a metalake. If the policy is already enabled, this method does nothing. + + Args: + policy_name (str): The name of the policy to enable. + + Raises: + NotImplementedError: If the method is not implemented. + NoSuchPolicyException: If the policy does not exist. + """ + raise NotImplementedError() + + @abstractmethod + async def disable_policy(self, policy_name: str) -> None: + """ + Disable a policy under a metalake. If the policy is already disabled, this method does nothing. + + Args: + policy_name (str): The name of the policy to disable. + + Raises: + NotImplementedError: If the method is not implemented. + NoSuchPolicyException: If the policy does not exist. + """ + raise NotImplementedError() + + @abstractmethod + async def alter_policy( + self, + policy_name: str, + *changes, + ) -> Policy: + # todo + raise NotImplementedError() + + @abstractmethod + async def delete_policy(self, policy_name: str) -> bool: + """ + Delete a policy under a metalake. + + Args: + policy_name (str): The name of the policy to delete. + + Raises: + NotImplementedError: If the method is not implemented. + NoSuchPolicyException: If the policy does not exist. + + Returns: + bool: True if the policy is deleted successfully, False otherwise. + """ + raise NotImplementedError() diff --git a/clients/client-python/gravitino/api/policy/supports_policies.py b/clients/client-python/gravitino/api/policy/supports_policies.py new file mode 100644 index 00000000000..5504d0a0599 --- /dev/null +++ b/clients/client-python/gravitino/api/policy/supports_policies.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from __future__ import annotations + +from abc import ABC, abstractmethod + +from gravitino.api.policy import Policy + + +class SupportsPolicies(ABC): + """The interface for supporting getting or associating + policies with a metadata object. This interface will be mixed + with metadata objects to provide policy operations.""" + + @abstractmethod + def list_policies(self) -> list[str]: + """ + List all the policy names for the specific object. + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + list[str]: The policy name list for the specific object. + """ + raise NotImplementedError() + + @abstractmethod + def list_policy_infos(self) -> list[Policy]: + """ + List all the policies with details for the specific object. + + Raises: + NotImplementedError: If the method is not implemented. + + Returns: + list[Policy]: the policy list with details for the specific object. + """ + raise NotImplementedError() + + @abstractmethod + def get_policy(self, policy_name: str) -> Policy: + """ + Get a policy by its name for the specific object. + + Args: + policy_name (str): The name of the policy. + + Raises: + NotImplementedError: If the method is not implemented. + NoSuchPolicyException: If the policy with the specific name does not exist. + + Returns: + Policy: The policy. + """ + raise NotImplementedError() + + @abstractmethod + def associate_policies( + self, + policies_to_add: list[str], + policies_to_remove: list[str], + ) -> list[str]: + """ + Associate policies to the specific object. + The policiesToAdd will be applied to the object and the policiesToRemove + will be removed from the object. Note that: + 1. Adding or removing policies that are not existed will be ignored. + 2. If the same name policy is in both policiesToAdd and policiesToRemove, + it will be ignored. + 3. If the policy is already applied to the object, it will + throw PolicyAlreadyAssociatedException + + Args: + policies_to_add (list[str]): The policy name list to be added to the specific object. + policies_to_remove (list[str]): The policy name list to be removed from the specific object. + + Raises: + NotImplementedError: If the method is not implemented. + PolicyAlreadyAssociatedException: If the policy is already applied to the object. + + Returns: + list[str]: The list of applied policies. + """ + raise NotImplementedError() diff --git a/clients/client-python/tests/unittests/api/policy/__init__.py b/clients/client-python/tests/unittests/api/policy/__init__.py new file mode 100644 index 00000000000..13a83393a91 --- /dev/null +++ b/clients/client-python/tests/unittests/api/policy/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/clients/client-python/tests/unittests/api/policy/test_policy_change.py b/clients/client-python/tests/unittests/api/policy/test_policy_change.py new file mode 100644 index 00000000000..b012030a683 --- /dev/null +++ b/clients/client-python/tests/unittests/api/policy/test_policy_change.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from gravitino.api.policy import PolicyChange, RenamePolicy, UpdatePolicyComment + + +class TestPolicyChange(unittest.TestCase): + def test_rename_policy(self) -> None: + change1, change2 = ( + PolicyChange.rename(f"new_policy_name{i + 1}") for i in range(2) + ) + change3 = PolicyChange.rename("new_policy_name1") + + self.assertTrue(isinstance(change1, RenamePolicy)) + + self.assertNotEqual(change1, change2) + self.assertNotEqual(change2, change3) + self.assertEqual(change1, change3) + + self.assertNotEqual(hash(change1), hash(change2)) + self.assertNotEqual(hash(change2), hash(change3)) + self.assertEqual(hash(change1), hash(change3)) + + self.assertEqual("new_policy_name1", change1.new_name()) + self.assertEqual("RENAME POLICY new_policy_name1", str(change1)) + + def test_update_comment_for_policy(self) -> None: + change1, change2 = PolicyChange.update_comment( + f"new_comment{i + 1}" for i in range(2) + ) + change3 = PolicyChange.update_comment("new_comment1") + + self.assertTrue(isinstance(change1, UpdatePolicyComment)) + + self.assertNotEqual(change1, change2) + self.assertNotEqual(change2, change3) + self.assertEqual(change1, change3) + + self.assertNotEqual(hash(change1), hash(change2)) + self.assertNotEqual(hash(change2), hash(change3)) + self.assertEqual(hash(change1), hash(change3)) + + self.assertEqual("new_comment1", change1.new_comment()) + self.assertEqual("UPDATE POLICY COMMENT new_comment1", str(change1)) + + def test_update_content_for_policy(self) -> None: + new_content1 = {"key": "value1"} + new_content2 = {"key": "value2"} + change1, change2 = ( + PolicyChange.update_content("policy_type", new_content1), + PolicyChange.update_content("policy_type", new_content2), + ) + change3 = PolicyChange.update_content("policy_type", new_content1) + + self.assertTrue(isinstance(change1, PolicyChange)) + + self.assertNotEqual(change1, change2) + self.assertNotEqual(change2, change3) + self.assertEqual(change1, change3) + + self.assertNotEqual(hash(change1), hash(change2)) + self.assertNotEqual(hash(change2), hash(change3)) + self.assertEqual(hash(change1), hash(change3)) + + self.assertEqual("policy_type", change1.policy_type()) + self.assertEqual(new_content1, change1.new_content()) + self.assertEqual( + "UPDATE POLICY CONTENT policy_type with content {'key': 'value1'}", + str(change1), + ) diff --git a/clients/client-python/tests/unittests/api/policy/test_policy_contents.py b/clients/client-python/tests/unittests/api/policy/test_policy_contents.py new file mode 100644 index 00000000000..7745cabc269 --- /dev/null +++ b/clients/client-python/tests/unittests/api/policy/test_policy_contents.py @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import unittest + +from gravitino.api.metadata_object import MetadataObject +from gravitino.api.policy import IcebergDataCompactionContent, PolicyContents +from gravitino.exceptions.base import IllegalArgumentException + + +class TestPolicyChange(unittest.TestCase): + def test_iceberg_compaction_content_uses_defaults(self) -> None: + content = PolicyContents.iceberg_data_compaction() + + self.assertEqual( + IcebergDataCompactionContent.DEFAULT_MIN_DATA_FILE_MSE, + content.rules().get(IcebergDataCompactionContent.MIN_DATA_FILE_MSE_KEY), + ) + self.assertEqual( + IcebergDataCompactionContent.DEFAULT_MIN_DELETE_FILE_NUMBER, + content.rules().get( + IcebergDataCompactionContent.MIN_DELETE_FILE_NUMBER_KEY + ), + ) + self.assertEqual( + 1, + content.rules().get(IcebergDataCompactionContent.DATA_FILE_MSE_WEIGHT_KEY), + ) + self.assertEqual( + 100, + content.rules().get( + IcebergDataCompactionContent.DELETE_FILE_NUMBER_WEIGHT_KEY + ), + ) + self.assertEqual( + 50, + content.rules().get(IcebergDataCompactionContent.MAX_PARTITION_NUM_KEY), + ) + self.assertIsNone(content.rules().get("job.options.target-file-size-bytes")) + self.assertIsNone(content.rules().get("job.options.min-input-files")) + self.assertIsNone(content.rules().get("job.options.delete-file-threshold")) + + hash(content) + + def test_iceberg_compaction_content_generates_optimizer_fields(self) -> None: + content = PolicyContents.iceberg_data_compaction( + 1000, 1, {"target-file-size-bytes": "1048576", "min-input-files": "1"} + ) + + self.assertEquals( + "iceberg-data-compaction", content.properties().get("strategy.type") + ) + self.assertEquals( + "builtin-iceberg-rewrite-data-files", + content.properties().get("job.template-name"), + ) + self.assertEquals(1000, content.rules().get("minDataFileMse")) + self.assertEquals(1, content.rules().get("minDeleteFileNumber")) + self.assertEquals(1, content.rules().get("dataFileMseWeight")) + self.assertEquals(100, content.rules().get("deleteFileNumberWeight")) + self.assertEquals(50, content.rules().get("max-partition-num")) + self.assertEquals( + "custom-data-file-mse >= minDataFileMse || custom-delete-file-number >= minDeleteFileNumber", + content.rules().get("trigger-expr"), + ) + + self.assertEquals( + "custom-data-file-mse * dataFileMseWeight" + + " + custom-delete-file-number * deleteFileNumberWeight", + content.rules().get("score-expr"), + ) + self.assertEquals( + "1048576", content.rules().get("job.options.target-file-size-bytes") + ) + self.assertEquals("1", content.rules().get("job.options.min-input-files")) + self.assertEquals( + { + MetadataObject.Type.CATALOG, + MetadataObject.Type.SCHEMA, + MetadataObject.Type.TABLE, + }, + content.supportedObjectTypes(), + ) + + def test_iceberg_compaction_content_supports_custom_weights(self) -> None: + content = PolicyContents.iceberg_data_compaction( + 1000, + 1, + 3, + 200, + 88, + { + "target-file-size-bytes": "1048576", + "min-input-files": "1", + }, + ) + + self.assertEquals(3, content.rules().get("dataFileMseWeight")) + self.assertEquals(200, content.rules().get("deleteFileNumberWeight")) + self.assertEquals(88, content.rules().get("max-partition-num")) + + with self.assertRaises(IllegalArgumentException): + content.validate() + + def test_iceberg_compaction_content_rejects_invalid_rewrite_option_key( + self, + ) -> None: + content = PolicyContents.iceberg_data_compaction( + 1000, + 1, + {"job.options.target-file-size-bytes": "1048576"}, + ) + + with self.assertRaises(IllegalArgumentException): + content.validate() + + def test_iceberg_compaction_content_rejects_invalid_max_partition_num(self): + content = PolicyContents.iceberg_data_compaction( + 1000, 1, 2, 10, 0, {"target-file-size-bytes": "1048576"} + ) + + with self.assertRaises(IllegalArgumentException): + content.validate()