From 0720c0cff6ebeefab5ff522abe4b8cae092aea69 Mon Sep 17 00:00:00 2001 From: Akshay Thorat Date: Sun, 22 Mar 2026 13:01:59 -0700 Subject: [PATCH] [#10280] Support Iceberg snapshot maintenance procedures via Gravitino Trino Connector Delegate Iceberg snapshot maintenance procedures (expire_snapshots, remove_orphan_files, rewrite_data_files/optimize, rewrite_manifests) from the Gravitino Trino Connector to the internal Iceberg connector. Changes: - GravitinoConnector: Add getProcedures() and getTableProcedures() - GravitinoMetadata: Add getLayoutForTableExecute(), beginTableExecute(), finishTableExecute() in base class - Version-specific metadata classes: Add getTableHandleForExecute() and executeTableExecute() with correct SPI signatures per Trino version - GravitinoPageSinkProvider: Add createPageSink for ConnectorTableExecuteHandle - Add unit tests (TestGravitinoConnectorProcedures) and integration tests Closes #10280 Wrap ConnectorTableExecuteHandle for cross-classloader serialization Address Copilot review: wrap ConnectorTableExecuteHandle from the internal connector in GravitinoTableExecuteHandle before returning to Trino, and unwrap before delegating. This matches the existing pattern used for insert/merge handles and prevents serialization failures in distributed table procedure execution. Changes: - Create GravitinoTableExecuteHandle wrapper class - Wrap handle in getTableHandleForExecute() across all version-specific metadata classes (435, 440, 446, 452, 469, 478) - Unwrap handle in executeTableExecute(), beginTableExecute(), finishTableExecute(), getLayoutForTableExecute(), and createPageSink() - Fix misleading comment in integration test SQL - Update unit tests to verify wrapping/unwrapping behavior Address review: use List.of(), add rewrite_manifests integration test --- .../00013_snapshot_maintenance.sql | 69 +++++ .../00013_snapshot_maintenance.txt | 35 +++ .../trino/connector/GravitinoMetadata435.java | 25 ++ .../trino/connector/GravitinoMetadata440.java | 25 ++ .../trino/connector/GravitinoMetadata446.java | 25 ++ .../trino/connector/GravitinoMetadata452.java | 25 ++ .../trino/connector/GravitinoMetadata469.java | 27 ++ .../trino/connector/GravitinoMetadata478.java | 28 ++ .../trino/connector/GravitinoConnector.java | 14 + .../trino/connector/GravitinoMetadata.java | 46 ++++ .../connector/GravitinoPageSinkProvider.java | 14 + .../GravitinoTableExecuteHandle.java | 61 +++++ .../TestGravitinoConnectorProcedures.java | 254 ++++++++++++++++++ 13 files changed, 648 insertions(+) create mode 100644 trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql create mode 100644 trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt create mode 100644 trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java create mode 100644 trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java diff --git a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql new file mode 100644 index 00000000000..a44a2af6ec5 --- /dev/null +++ b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.sql @@ -0,0 +1,69 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- Test Iceberg snapshot maintenance procedures via Gravitino connector + +CREATE SCHEMA IF NOT EXISTS gt_snapshot_test; + +CREATE TABLE gt_snapshot_test.maintenance_table ( + id int, + name varchar +); + +-- Insert data to create snapshots +INSERT INTO gt_snapshot_test.maintenance_table VALUES (1, 'alice'); + +INSERT INTO gt_snapshot_test.maintenance_table VALUES (2, 'bob'); + +INSERT INTO gt_snapshot_test.maintenance_table VALUES (3, 'charlie'); + +-- Verify we have multiple snapshots +SELECT count(*) >= 3 FROM "gt_snapshot_test"."maintenance_table$snapshots"; + +-- Test expire_snapshots procedure (expire all snapshots older than now) +ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE expire_snapshots(retention_threshold => '0s'); + +-- Verify table data is still intact after expire_snapshots +SELECT count(*) FROM gt_snapshot_test.maintenance_table; + +-- Test remove_orphan_files procedure +ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE remove_orphan_files(retention_threshold => '0s'); + +-- Verify table data is still intact after remove_orphan_files +SELECT count(*) FROM gt_snapshot_test.maintenance_table; + +-- Insert more data to create small files for optimize (rewrite_data_files) +INSERT INTO gt_snapshot_test.maintenance_table VALUES (4, 'dave'); + +INSERT INTO gt_snapshot_test.maintenance_table VALUES (5, 'eve'); + +-- Test optimize (rewrite_data_files) table procedure +ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE optimize; + +-- Verify all data is still accessible after optimize +SELECT count(*) FROM gt_snapshot_test.maintenance_table; + +-- Test rewrite_manifests procedure +ALTER TABLE gt_snapshot_test.maintenance_table EXECUTE rewrite_manifests; + +-- Verify all data is still accessible after rewrite_manifests +SELECT count(*) FROM gt_snapshot_test.maintenance_table; + +-- Cleanup +DROP TABLE gt_snapshot_test.maintenance_table; + +DROP SCHEMA gt_snapshot_test; diff --git a/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt new file mode 100644 index 00000000000..60fd3e43dfa --- /dev/null +++ b/trino-connector/integration-test/src/test/resources/trino-ci-testset/testsets/lakehouse-iceberg/00013_snapshot_maintenance.txt @@ -0,0 +1,35 @@ +CREATE SCHEMA + +CREATE TABLE + +INSERT: 1 row + +INSERT: 1 row + +INSERT: 1 row + +"true" + +EXECUTE expire_snapshots + +"3" + +EXECUTE remove_orphan_files + +"3" + +INSERT: 1 row + +INSERT: 1 row + +EXECUTE optimize + +"5" + +EXECUTE rewrite_manifests + +"5" + +DROP TABLE + +DROP SCHEMA diff --git a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java index 919d53f4a66..fdb584bb244 100644 --- a/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java +++ b/trino-connector/trino-connector-435-439/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata435.java @@ -24,11 +24,13 @@ import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.statistics.ComputedStatistics; import java.util.Collection; +import java.util.Map; import java.util.Optional; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; @@ -50,6 +52,29 @@ public void addColumn( catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata + .getTableHandleForExecute( + session, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode) + .map(GravitinoTableExecuteHandle::new); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)); + } + @Override public Optional finishInsert( ConnectorSession session, diff --git a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java index afcf61bd606..530231cdc9d 100644 --- a/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java +++ b/trino-connector/trino-connector-440-445/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata440.java @@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.statistics.ComputedStatistics; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; @@ -52,6 +54,29 @@ public void addColumn( catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata + .getTableHandleForExecute( + session, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode) + .map(GravitinoTableExecuteHandle::new); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)); + } + @Override public Optional finishInsert( ConnectorSession session, diff --git a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java index 86345379c95..6dbc31b0413 100644 --- a/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java +++ b/trino-connector/trino-connector-446-451/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata446.java @@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.statistics.ComputedStatistics; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; @@ -52,6 +54,29 @@ public void addColumn( catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata + .getTableHandleForExecute( + session, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode) + .map(GravitinoTableExecuteHandle::new); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)); + } + @Override public Optional finishInsert( ConnectorSession session, diff --git a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java index ff43beb2908..a36249d71ac 100644 --- a/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java +++ b/trino-connector/trino-connector-452-468/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata452.java @@ -24,12 +24,14 @@ import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.statistics.ComputedStatistics; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; @@ -67,6 +69,29 @@ public Optional finishInsert( computedStatistics); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata + .getTableHandleForExecute( + session, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode) + .map(GravitinoTableExecuteHandle::new); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)); + } + @SuppressWarnings("deprecation") @Override public ConnectorMergeTableHandle beginMerge( diff --git a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java index 27307f20367..ae40f3b58a1 100644 --- a/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java +++ b/trino-connector/trino-connector-469-472/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata469.java @@ -22,10 +22,12 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ColumnPosition; +import io.trino.spi.connector.ConnectorAccessControl; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; @@ -73,6 +75,31 @@ public Optional finishInsert( computedStatistics); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorAccessControl accessControl, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata + .getTableHandleForExecute( + session, + accessControl, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode) + .map(GravitinoTableExecuteHandle::new); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)); + } + @Override public ConnectorMergeTableHandle beginMerge( ConnectorSession session, diff --git a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java index 40173cdb42d..c67be1527a4 100644 --- a/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java +++ b/trino-connector/trino-connector-473-478/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata478.java @@ -22,10 +22,12 @@ import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ColumnPosition; +import io.trino.spi.connector.ConnectorAccessControl; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMergeTableHandle; import io.trino.spi.connector.ConnectorOutputMetadata; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.RetryMode; import io.trino.spi.connector.SchemaTableName; @@ -66,6 +68,32 @@ public void addColumn( catalogConnectorMetadata.addColumn(getTableName(tableHandle), gravitinoColumn, columnPosition); } + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorAccessControl accessControl, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata + .getTableHandleForExecute( + session, + accessControl, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode) + .map(GravitinoTableExecuteHandle::new); + } + + @Override + public Map executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + return internalMetadata.executeTableExecute( + session, GravitinoHandle.unWrap(tableExecuteHandle)); + } + @Override public Optional finishInsert( ConnectorSession session, diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java index 6247be1d4fd..a061d73d563 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoConnector.java @@ -33,6 +33,8 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplitManager; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.procedure.Procedure; import io.trino.spi.session.PropertyMetadata; import io.trino.spi.transaction.IsolationLevel; import java.util.List; @@ -101,6 +103,18 @@ protected GravitinoMetadata createGravitinoMetadata( throw new TrinoException(NOT_SUPPORTED, "Should be overridden in subclass"); } + @Override + public Set getProcedures() { + Connector internalConnector = catalogConnectorContext.getInternalConnector(); + return internalConnector.getProcedures(); + } + + @Override + public Set getTableProcedures() { + Connector internalConnector = catalogConnectorContext.getInternalConnector(); + return internalConnector.getTableProcedures(); + } + @Override public List> getTableProperties() { return catalogConnectorContext.getTableProperties(); diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java index ba3ca624dc4..ab8d0ed7530 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoMetadata.java @@ -23,16 +23,19 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; import io.trino.spi.TrinoException; import io.trino.spi.connector.AggregateFunction; import io.trino.spi.connector.AggregationApplicationResult; import io.trino.spi.connector.Assignment; +import io.trino.spi.connector.BeginTableExecuteResult; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorInsertTableHandle; import io.trino.spi.connector.ConnectorMetadata; import io.trino.spi.connector.ConnectorPartitioningHandle; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.ConnectorTableLayout; import io.trino.spi.connector.ConnectorTableMetadata; @@ -676,6 +679,49 @@ public Optional getInsertLayout( : new ConnectorTableLayout(result.getPartitionColumns())); } + @Override + public Optional getLayoutForTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + return internalMetadata + .getLayoutForTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)) + .map( + result -> + result.getPartitioning().isPresent() + ? new ConnectorTableLayout( + new GravitinoPartitioningHandle(result.getPartitioning().get()), + result.getPartitionColumns(), + result.supportsMultipleWritersPerPartition()) + : new ConnectorTableLayout(result.getPartitionColumns())); + } + + @Override + public BeginTableExecuteResult + beginTableExecute( + ConnectorSession session, + ConnectorTableExecuteHandle tableExecuteHandle, + ConnectorTableHandle updatedSourceTableHandle) { + BeginTableExecuteResult result = + internalMetadata.beginTableExecute( + session, + GravitinoHandle.unWrap(tableExecuteHandle), + GravitinoHandle.unWrap(updatedSourceTableHandle)); + SchemaTableName tableName = getTableName(updatedSourceTableHandle); + return new BeginTableExecuteResult<>( + new GravitinoTableExecuteHandle(result.getTableExecuteHandle()), + new GravitinoTableHandle( + tableName.getSchemaName(), tableName.getTableName(), result.getSourceHandle())); + } + + @Override + public void finishTableExecute( + ConnectorSession session, + ConnectorTableExecuteHandle tableExecuteHandle, + Collection fragments, + List tableExecuteState) { + internalMetadata.finishTableExecute( + session, GravitinoHandle.unWrap(tableExecuteHandle), fragments, tableExecuteState); + } + protected SchemaTableName getTableName(ConnectorTableHandle tableHandle) { return ((GravitinoTableHandle) tableHandle).toSchemaTableName(); } diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java index 7122a7f484c..7fc546ef680 100644 --- a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoPageSinkProvider.java @@ -26,6 +26,7 @@ import io.trino.spi.connector.ConnectorPageSinkId; import io.trino.spi.connector.ConnectorPageSinkProvider; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; import io.trino.spi.connector.ConnectorTransactionHandle; import org.apache.commons.lang3.NotImplementedException; @@ -65,6 +66,19 @@ public ConnectorPageSink createPageSink( pageSinkId); } + @Override + public ConnectorPageSink createPageSink( + ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorTableExecuteHandle tableExecuteHandle, + ConnectorPageSinkId pageSinkId) { + return pageSinkProvider.createPageSink( + GravitinoHandle.unWrap(transactionHandle), + session, + GravitinoHandle.unWrap(tableExecuteHandle), + pageSinkId); + } + @Override public ConnectorMergeSink createMergeSink( ConnectorTransactionHandle transactionHandle, diff --git a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java new file mode 100644 index 00000000000..2bf93484f66 --- /dev/null +++ b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/GravitinoTableExecuteHandle.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.trino.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import io.trino.spi.connector.ConnectorTableExecuteHandle; + +/** The GravitinoTableExecuteHandle is used for handling table execute operations. */ +public class GravitinoTableExecuteHandle + implements ConnectorTableExecuteHandle, GravitinoHandle { + + private HandleWrapper handleWrapper = + new HandleWrapper<>(ConnectorTableExecuteHandle.class); + + /** + * Constructs a new GravitinoTableExecuteHandle from a serialized handle string. + * + * @param handleString the serialized handle string + */ + @JsonCreator + public GravitinoTableExecuteHandle(@JsonProperty(HANDLE_STRING) String handleString) { + this.handleWrapper = handleWrapper.fromJson(handleString); + } + + /** + * Constructs a new GravitinoTableExecuteHandle from a ConnectorTableExecuteHandle. + * + * @param tableExecuteHandle the internal connector table execute handle + */ + public GravitinoTableExecuteHandle(ConnectorTableExecuteHandle tableExecuteHandle) { + this.handleWrapper = new HandleWrapper<>(tableExecuteHandle); + } + + @JsonProperty + @Override + public String getHandleString() { + return handleWrapper.toJson(); + } + + @Override + public ConnectorTableExecuteHandle getInternalHandle() { + return handleWrapper.getHandle(); + } +} diff --git a/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java new file mode 100644 index 00000000000..d551f635dd2 --- /dev/null +++ b/trino-connector/trino-connector/src/test/java/org/apache/gravitino/trino/connector/TestGravitinoConnectorProcedures.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.trino.connector; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.trino.spi.connector.BeginTableExecuteResult; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorMetadata; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableExecuteHandle; +import io.trino.spi.connector.ConnectorTableHandle; +import io.trino.spi.connector.RetryMode; +import io.trino.spi.connector.TableProcedureMetadata; +import io.trino.spi.procedure.Procedure; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.SupportsSchemas; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.rel.TableCatalog; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorContext; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadata; +import org.apache.gravitino.trino.connector.catalog.CatalogConnectorMetadataAdapter; +import org.apache.gravitino.trino.connector.metadata.GravitinoCatalog; +import org.junit.jupiter.api.Test; + +/** + * Tests that verify the Gravitino Trino connector properly delegates procedure-related operations + * to the internal connector, enabling support for Iceberg snapshot maintenance procedures. + */ +public class TestGravitinoConnectorProcedures { + + @Test + void testGetProceduresDelegatesToInternalConnector() { + Connector mockInternalConnector = mock(Connector.class); + Set expectedProcedures = Set.of(mock(Procedure.class), mock(Procedure.class)); + when(mockInternalConnector.getProcedures()).thenReturn(expectedProcedures); + + GravitinoConnector connector = createConnector(mockInternalConnector); + Set result = connector.getProcedures(); + + assertEquals(expectedProcedures, result); + verify(mockInternalConnector).getProcedures(); + } + + @Test + void testGetProceduresReturnsEmptySetWhenNoProcedures() { + Connector mockInternalConnector = mock(Connector.class); + when(mockInternalConnector.getProcedures()).thenReturn(Set.of()); + + GravitinoConnector connector = createConnector(mockInternalConnector); + Set result = connector.getProcedures(); + + assertTrue(result.isEmpty()); + } + + @Test + void testGetTableProceduresDelegatesToInternalConnector() { + Connector mockInternalConnector = mock(Connector.class); + Set expectedProcedures = Set.of(mock(TableProcedureMetadata.class)); + when(mockInternalConnector.getTableProcedures()).thenReturn(expectedProcedures); + + GravitinoConnector connector = createConnector(mockInternalConnector); + Set result = connector.getTableProcedures(); + + assertEquals(expectedProcedures, result); + verify(mockInternalConnector).getTableProcedures(); + } + + @Test + void testGetTableHandleForExecuteDelegatesToInternalMetadata() { + ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class); + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableHandle internalTableHandle = mock(ConnectorTableHandle.class); + ConnectorTableExecuteHandle expectedExecuteHandle = mock(ConnectorTableExecuteHandle.class); + + GravitinoTableHandle gravitinoTableHandle = + new GravitinoTableHandle("test_schema", "test_table", internalTableHandle); + + Map executeProperties = Map.of(); + when(internalMetadata.getTableHandleForExecute( + any(), eq(internalTableHandle), anyString(), any(), any())) + .thenReturn(Optional.of(expectedExecuteHandle)); + + GravitinoMetadata metadata = createMetadata(internalMetadata); + + Optional result = + metadata.getTableHandleForExecute( + session, gravitinoTableHandle, "optimize", executeProperties, RetryMode.NO_RETRIES); + + assertTrue(result.isPresent()); + assertTrue(result.get() instanceof GravitinoTableExecuteHandle); + assertSame( + expectedExecuteHandle, ((GravitinoTableExecuteHandle) result.get()).getInternalHandle()); + verify(internalMetadata) + .getTableHandleForExecute( + session, internalTableHandle, "optimize", executeProperties, RetryMode.NO_RETRIES); + } + + @Test + void testBeginTableExecuteUnwrapsAndWrapsHandles() { + ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class); + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableHandle internalTableHandle = mock(ConnectorTableHandle.class); + ConnectorTableExecuteHandle executeHandle = mock(ConnectorTableExecuteHandle.class); + ConnectorTableHandle resultSourceHandle = mock(ConnectorTableHandle.class); + ConnectorTableExecuteHandle resultExecuteHandle = mock(ConnectorTableExecuteHandle.class); + + GravitinoTableHandle gravitinoTableHandle = + new GravitinoTableHandle("test_schema", "test_table", internalTableHandle); + GravitinoTableExecuteHandle wrappedExecuteHandle = + new GravitinoTableExecuteHandle(executeHandle); + + BeginTableExecuteResult internalResult = + new BeginTableExecuteResult<>(resultExecuteHandle, resultSourceHandle); + + when(internalMetadata.beginTableExecute(session, executeHandle, internalTableHandle)) + .thenReturn(internalResult); + + GravitinoMetadata metadata = createMetadata(internalMetadata); + + BeginTableExecuteResult result = + metadata.beginTableExecute(session, wrappedExecuteHandle, gravitinoTableHandle); + + assertTrue(result.getTableExecuteHandle() instanceof GravitinoTableExecuteHandle); + assertSame( + resultExecuteHandle, + ((GravitinoTableExecuteHandle) result.getTableExecuteHandle()).getInternalHandle()); + assertTrue(result.getSourceHandle() instanceof GravitinoTableHandle); + GravitinoTableHandle wrappedSource = (GravitinoTableHandle) result.getSourceHandle(); + assertEquals("test_schema", wrappedSource.getSchemaName()); + assertEquals("test_table", wrappedSource.getTableName()); + assertSame(resultSourceHandle, wrappedSource.getInternalHandle()); + } + + @Test + void testFinishTableExecuteDelegatesToInternalMetadata() { + ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class); + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableExecuteHandle internalExecuteHandle = mock(ConnectorTableExecuteHandle.class); + GravitinoTableExecuteHandle wrappedHandle = + new GravitinoTableExecuteHandle(internalExecuteHandle); + + GravitinoMetadata metadata = createMetadata(internalMetadata); + metadata.finishTableExecute(session, wrappedHandle, List.of(), List.of()); + + verify(internalMetadata) + .finishTableExecute(eq(session), eq(internalExecuteHandle), any(), any()); + } + + @Test + void testExecuteTableExecuteDelegatesToInternalMetadata() { + ConnectorMetadata internalMetadata = mock(ConnectorMetadata.class); + ConnectorSession session = mock(ConnectorSession.class); + ConnectorTableExecuteHandle internalExecuteHandle = mock(ConnectorTableExecuteHandle.class); + GravitinoTableExecuteHandle wrappedHandle = + new GravitinoTableExecuteHandle(internalExecuteHandle); + + GravitinoMetadata metadata = createMetadata(internalMetadata); + metadata.executeTableExecute(session, wrappedHandle); + + verify(internalMetadata).executeTableExecute(session, internalExecuteHandle); + } + + private GravitinoConnector createConnector(Connector internalConnector) { + GravitinoCatalog mockCatalog = mock(GravitinoCatalog.class); + when(mockCatalog.geNameIdentifier()).thenReturn(NameIdentifier.of("metalake", "catalog")); + + GravitinoMetalake metalake = mockMetalake(); + + CatalogConnectorContext mockContext = mock(CatalogConnectorContext.class); + when(mockContext.getCatalog()).thenReturn(mockCatalog); + when(mockContext.getMetalake()).thenReturn(metalake); + when(mockContext.getInternalConnector()).thenReturn(internalConnector); + + return new GravitinoConnector(mockContext); + } + + private GravitinoMetadata createMetadata(ConnectorMetadata internalMetadata) { + CatalogConnectorMetadata catalogConnectorMetadata = mock(CatalogConnectorMetadata.class); + CatalogConnectorMetadataAdapter metadataAdapter = mock(CatalogConnectorMetadataAdapter.class); + return new TestGravitinoMetadataImpl( + catalogConnectorMetadata, metadataAdapter, internalMetadata); + } + + private static GravitinoMetalake mockMetalake() { + GravitinoMetalake metalake = mock(GravitinoMetalake.class); + Catalog catalog = mock(Catalog.class); + when(catalog.asSchemas()).thenReturn(mock(SupportsSchemas.class)); + when(catalog.asTableCatalog()).thenReturn(mock(TableCatalog.class)); + when(metalake.loadCatalog(any())).thenReturn(catalog); + return metalake; + } + + private static final class TestGravitinoMetadataImpl extends GravitinoMetadata { + private TestGravitinoMetadataImpl( + CatalogConnectorMetadata catalogConnectorMetadata, + CatalogConnectorMetadataAdapter metadataAdapter, + ConnectorMetadata internalMetadata) { + super(catalogConnectorMetadata, metadataAdapter, internalMetadata); + } + + @Override + public Optional getTableHandleForExecute( + ConnectorSession session, + ConnectorTableHandle tableHandle, + String procedureName, + Map executeProperties, + RetryMode retryMode) { + return internalMetadata + .getTableHandleForExecute( + session, + GravitinoHandle.unWrap(tableHandle), + procedureName, + executeProperties, + retryMode) + .map(GravitinoTableExecuteHandle::new); + } + + @Override + public void executeTableExecute( + ConnectorSession session, ConnectorTableExecuteHandle tableExecuteHandle) { + internalMetadata.executeTableExecute(session, GravitinoHandle.unWrap(tableExecuteHandle)); + } + } +}