diff --git a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql new file mode 100644 index 00000000000..7dc7f4b43c8 --- /dev/null +++ b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql @@ -0,0 +1,63 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Test Iceberg snapshot maintenance procedures via Gravitino connector + +CREATE SCHEMA IF NOT EXISTS gt_snapshot_test; + +CREATE TABLE gt_snapshot_test.maintenance_table ( + id int, + name varchar +); + +-- Insert data to create snapshots +INSERT INTO gt_snapshot_test.maintenance_table VALUES (1, 'alice'); + +INSERT INTO gt_snapshot_test.maintenance_table VALUES (2, 'bob'); + +INSERT INTO gt_snapshot_test.maintenance_table VALUES (3, 'charlie'); + +-- Verify we have multiple snapshots +SELECT count(*) >= 3 FROM "gt_snapshot_test"."maintenance_table$snapshots"; + +-- Test expire_snapshots procedure (expire snapshots older than a far future timestamp keeps all) +ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE expire_snapshots(retention_threshold => '0s'); + +-- Verify table data is still intact after expire_snapshots +SELECT count(*) FROM gt_snapshot_test.maintenance_table; + +-- Test remove_orphan_files procedure +ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE remove_orphan_files(retention_threshold => '0s'); + +-- Verify table data is still intact after remove_orphan_files +SELECT count(*) FROM gt_snapshot_test.maintenance_table; + +-- Insert more data to create small files for optimize (rewrite_data_files) +INSERT INTO gt_snapshot_test.maintenance_table VALUES (4, 'dave'); + +INSERT INTO gt_snapshot_test.maintenance_table VALUES (5, 'eve'); + +-- Test optimize (rewrite_data_files) table procedure +ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE optimize; + +-- Verify all data is still accessible after optimize +SELECT count(*) FROM gt_snapshot_test.maintenance_table; + +-- Cleanup +DROP TABLE gt_snapshot_test.maintenance_table; + +DROP SCHEMA gt_snapshot_test; diff --git a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt new file mode 100644 index 00000000000..e01de601c58 --- /dev/null +++ b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt @@ -0,0 +1,31 @@ +CREATE SCHEMA + +CREATE TABLE + +INSERT: 1 row + +INSERT: 1 row + +INSERT: 1 row + +"true" + +EXECUTE expire_snapshots + +"3" + +EXECUTE remove_orphan_files + +"3" + +INSERT: 1 row + +INSERT: 1 row + +EXECUTE optimize + +"5" + +DROP TABLE + +DROP SCHEMA diff --git a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java index 919d53f4a66..da6757a4f88 100644 --- a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java +++ b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java @@ -24,11 +24,13 @@ import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.statistics.ComputedStatistics; import java.util.Collection; +import java.util.Map; import java.util.Optional; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; @@ -50,6 +52,23 @@ public void addColumn( catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata.getTableHandleForExecute( + session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, tableExecuteHandle); + } + @Override public Optional finishInsert( ConnectorSession session, diff --git a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java index afcf61bd606..1f8abb28bc7 100644 --- a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java +++ b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java @@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.statistics.ComputedStatistics; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; @@ -52,6 +54,23 @@ public void addColumn( catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata.getTableHandleForExecute( + session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, tableExecuteHandle); + } + @Override public Optional finishInsert( ConnectorSession session, diff --git a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java index 86345379c95..6bea446023d 100644 --- a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java +++ b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java @@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.statistics.ComputedStatistics; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; @@ -52,6 +54,23 @@ public void addColumn( catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata.getTableHandleForExecute( + session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, tableExecuteHandle); + } + @Override public Optional finishInsert( ConnectorSession session, diff --git a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java index ff43beb2908..01e1b819b4c 100644 --- a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java +++ b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java @@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.statistics.ComputedStatistics; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; @@ -67,6 +69,23 @@ public Optional finishInsert( computedStatistics); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata.getTableHandleForExecute( + session, GravitinoHandle.unWrap(tableHandle), procedureName, executeProperties, retryMode); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, tableExecuteHandle); + } + @SuppressWarnings("deprecation") @Override public ConnectorMergeTableHandle beginMerge( diff --git a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java index 27307f20367..fef089e48a5 100644 --- a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java +++ b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java @@ -22,10 +22,12 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ColumnPosition; +import io.trino.spi.connector.ConnectorAccessControl; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; @@ -73,6 +75,29 @@ public Optional finishInsert( computedStatistics); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorAccessControl accessControl, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata.getTableHandleForExecute( + session, + accessControl, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, tableExecuteHandle); + } + @Override public ConnectorMergeTableHandle beginMerge( ConnectorSession session, diff --git a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java index 40173cdb42d..7bc738614d4 100644 --- a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java +++ b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java @@ -22,10 +22,12 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ColumnPosition; +import io.trino.spi.connector.ConnectorAccessControl; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; @@ -66,6 +68,29 @@ public void addColumn( catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn, columnPosition); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorAccessControl accessControl, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata.getTableHandleForExecute( + session, + accessControl, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode); + } + + @Override + public Map executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + return internalMetadata.executeTableExecute(session, tableExecuteHandle); + } + @Override public Optional finishInsert( ConnectorSession session, diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java index 6247be1d4fd..a061d73d563 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java @@ -33,6 +33,8 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.procedure.Procedure; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; import java.util.List; @@ -101,6 +103,18 @@ protected GravitinoMetadata createGravitinoMetadata( throw new TrinoException(NOT_SUPPORTED, "Should be overridden in subclass"); } + @Override + public Set getProcedures() { + Connector internalConnector = catalogConnectorContext.getInternalConnector(); + return internalConnector.getProcedures(); + } + + @Override + public Set getTableProcedures() { + Connector internalConnector = catalogConnectorContext.getInternalConnector(); + return internalConnector.getTableProcedures(); + } + @Override public List> getTableProperties() { return catalogConnectorContext.getTableProperties(); diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java index 7cf72da146b..834c891c465 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java @@ -23,16 +23,19 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; import io.trino.spi.TrinoException; import io.trino.spi.connector.AggregateFunction; import io.trino.spi.connector.AggregationApplicationResult; import io.trino.spi.connector.Assignment; +import io.trino.spi.connector.BeginTableExecuteResult; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableLayout; import io.trino.spi.connector.ConnectorTableMetadata; @@ -57,6 +60,7 @@ import io.trino.spi.statistics.ColumnStatistics; import io.trino.spi.statistics.TableStatistics; import io.trino.spi.type.Type; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -660,6 +664,46 @@ public Optional getInsertLayout( : new ConnectorTableLayout(result.getPartitionColumns())); } + @Override + public Optional getLayoutForTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + return internalMetadata + .getLayoutForTableExecute(session, tableExecuteHandle) + .map( + result -> + result.getPartitioning().isPresent() + ? new ConnectorTableLayout( + new GravitinoPartitioningHandle(result.getPartitioning().get()), + result.getPartitionColumns(), + result.supportsMultipleWritersPerPartition()) + : new ConnectorTableLayout(result.getPartitionColumns())); + } + + @Override + public BeginTableExecuteResult + beginTableExecute( + ConnectorSession session, + ConnectorTableExecuteHandle tableExecuteHandle, + ConnectorTableHandle updatedSourceTableHandle) { + BeginTableExecuteResult result = + internalMetadata.beginTableExecute( + session, tableExecuteHandle, GravitinoHandle.unWrap(updatedSourceTableHandle)); + SchemaTableName tableName = getTableName(updatedSourceTableHandle); + return new BeginTableExecuteResult<>( + result.getTableExecuteHandle(), + new GravitinoTableHandle( + tableName.getSchemaName(), tableName.getTableName(), result.getSourceHandle())); + } + + @Override + public void finishTableExecute( + ConnectorSession session, + ConnectorTableExecuteHandle tableExecuteHandle, + Collection fragments, + List tableExecuteState) { + internalMetadata.finishTableExecute(session, tableExecuteHandle, fragments, tableExecuteState); + } + protected SchemaTableName getTableName(ConnectorTableHandle tableHandle) { return ((GravitinoTableHandle) tableHandle).toSchemaTableName(); } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java index 7122a7f484c..96bdc34081a 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java @@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorPageSinkId; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import org.apache.commons.lang3.NotImplementedException; @@ -65,6 +66,16 @@ public ConnectorPageSink createPageSink( pageSinkId); } + @Override + public ConnectorPageSink createPageSink( + ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorTableExecuteHandle tableExecuteHandle, + ConnectorPageSinkId pageSinkId) { + return pageSinkProvider.createPageSink( + GravitinoHandle.unWrap(transactionHandle), session, tableExecuteHandle, pageSinkId); + } + @Override public ConnectorMergeSink createMergeSink( ConnectorTransactionHandle transactionHandle, diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java new file mode 100644 index 00000000000..05e90eddf12 --- /dev/null +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.trino.connector; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.trino.spi.connector.BeginTableExecuteResult; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.RetryMode; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.procedure.Procedure; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.SupportsSchemas; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; +import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog; +import org.junit.jupiter.api.Test; + +/** + * Tests that verify the Gravitino Trino connector properly delegates procedure-related operations + * to the internal connector, enabling support for Iceberg snapshot maintenance procedures. + */ +public class TestGravitinoConnectorProcedures { + + @Test + void testGetProceduresDelegatesToInternalConnector() { + Connector mockInternalConnector = mock(Connector.class); + Set expectedProcedures = Set.of(mock(Procedure.class), mock(Procedure.class)); + when(mockInternalConnector.getProcedures()).thenReturn(expectedProcedures); + + GravitinoConnector connector = createConnector(mockInternalConnector); + Set result = connector.getProcedures(); + + assertEquals(expectedProcedures, result); + verify(mockInternalConnector).getProcedures(); + } + + @Test + void testGetProceduresReturnsEmptySetWhenNoProcedures() { + Connector mockInternalConnector = mock(Connector.class); + when(mockInternalConnector.getProcedures()).thenReturn(Set.of()); + + GravitinoConnector connector = createConnector(mockInternalConnector); + Set result = connector.getProcedures(); + + assertTrue(result.isEmpty()); + } + + @Test + void testGetTableProceduresDelegatesToInternalConnector() { + Connector mockInternalConnector = mock(Connector.class); + Set expectedProcedures = Set.of(mock(TableProcedureMetadata.class)); + when(mockInternalConnector.getTableProcedures()).thenReturn(expectedProcedures); + + GravitinoConnector connector = createConnector(mockInternalConnector); + Set result = connector.getTableProcedures(); + + assertEquals(expectedProcedures, result); + verify(mockInternalConnector).getTableProcedures(); + } + + @Test + void testGetTableHandleForExecuteDelegatesToInternalMetadata() { + ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class); + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableHandle internalTableHandle = mock(ConnectorTableHandle.class); + ConnectorTableExecuteHandle expectedExecuteHandle = mock(ConnectorTableExecuteHandle.class); + + GravitinoTableHandle gravitinoTableHandle = + new GravitinoTableHandle("test_schema", "test_table", internalTableHandle); + + Map executeProperties = Map.of(); + when(internalMetadata.getTableHandleForExecute( + any(), eq(internalTableHandle), anyString(), any(), any())) + .thenReturn(Optional.of(expectedExecuteHandle)); + + GravitinoMetadata metadata = createMetadata(internalMetadata); + + Optional result = + metadata.getTableHandleForExecute( + session, gravitinoTableHandle, "optimize", executeProperties, RetryMode.NO_RETRIES); + + assertTrue(result.isPresent()); + assertSame(expectedExecuteHandle, result.get()); + verify(internalMetadata) + .getTableHandleForExecute( + session, internalTableHandle, "optimize", executeProperties, RetryMode.NO_RETRIES); + } + + @Test + void testBeginTableExecuteUnwrapsAndWrapsHandles() { + ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class); + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableHandle internalTableHandle = mock(ConnectorTableHandle.class); + ConnectorTableExecuteHandle executeHandle = mock(ConnectorTableExecuteHandle.class); + ConnectorTableHandle resultSourceHandle = mock(ConnectorTableHandle.class); + ConnectorTableExecuteHandle resultExecuteHandle = mock(ConnectorTableExecuteHandle.class); + + GravitinoTableHandle gravitinoTableHandle = + new GravitinoTableHandle("test_schema", "test_table", internalTableHandle); + + BeginTableExecuteResult internalResult = + new BeginTableExecuteResult<>(resultExecuteHandle, resultSourceHandle); + + when(internalMetadata.beginTableExecute(session, executeHandle, internalTableHandle)) + .thenReturn(internalResult); + + GravitinoMetadata metadata = createMetadata(internalMetadata); + + BeginTableExecuteResult result = + metadata.beginTableExecute(session, executeHandle, gravitinoTableHandle); + + assertSame(resultExecuteHandle, result.getTableExecuteHandle()); + assertTrue(result.getSourceHandle() instanceof GravitinoTableHandle); + GravitinoTableHandle wrappedSource = (GravitinoTableHandle) result.getSourceHandle(); + assertEquals("test_schema", wrappedSource.getSchemaName()); + assertEquals("test_table", wrappedSource.getTableName()); + assertSame(resultSourceHandle, wrappedSource.getInternalHandle()); + } + + @Test + void testFinishTableExecuteDelegatesToInternalMetadata() { + ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class); + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableExecuteHandle executeHandle = mock(ConnectorTableExecuteHandle.class); + + GravitinoMetadata metadata = createMetadata(internalMetadata); + metadata.finishTableExecute(session, executeHandle, java.util.List.of(), java.util.List.of()); + + verify(internalMetadata).finishTableExecute(eq(session), eq(executeHandle), any(), any()); + } + + @Test + void testExecuteTableExecuteDelegatesToInternalMetadata() { + ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class); + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableExecuteHandle executeHandle = mock(ConnectorTableExecuteHandle.class); + + GravitinoMetadata metadata = createMetadata(internalMetadata); + metadata.executeTableExecute(session, executeHandle); + + verify(internalMetadata).executeTableExecute(session, executeHandle); + } + + private GravitinoConnector createConnector(Connector internalConnector) { + GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class); + when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake", "catalog")); + + GravitinoMetalake metalake = mockMetalake(); + + CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class); + when(mockContext.getCatalog()).thenReturn(mockCatalog); + when(mockContext.getMetalake()).thenReturn(metalake); + when(mockContext.getInternalConnector()).thenReturn(internalConnector); + + return new GravitinoConnector(mockContext); + } + + private GravitinoMetadata createMetadata(ConnectorMetadata internalMetadata) { + CatalogConnectorMetadata catalogConnectorMetadata = mock(CatalogConnectorMetadata.class); + CatalogConnectorMetadataAdapter metadataAdapter = mock(CatalogConnectorMetadataAdapter.class); + return new TestGravitinoMetadataImpl( + catalogConnectorMetadata, metadataAdapter, internalMetadata); + } + + private static GravitinoMetalake mockMetalake() { + GravitinoMetalake metalake = mock(GravitinoMetalake.class); + Catalog catalog = mock(Catalog.class); + when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class)); + when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class)); + when(metalake.loadCatalog(any())).thenReturn(catalog); + return metalake; + } + + private static final class TestGravitinoMetadataImpl extends GravitinoMetadata { + private TestGravitinoMetadataImpl( + CatalogConnectorMetadata catalogConnectorMetadata, + CatalogConnectorMetadataAdapter metadataAdapter, + ConnectorMetadata internalMetadata) { + super(catalogConnectorMetadata, metadataAdapter, internalMetadata); + } + + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata.getTableHandleForExecute( + session, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, tableExecuteHandle); + } + } +}