diff --git a/docs/flink-connector/flink-connector.md b/docs/flink-connector/flink-connector.md index 1d0c31664e1..7b973f8b77e 100644 --- a/docs/flink-connector/flink-connector.md +++ b/docs/flink-connector/flink-connector.md @@ -35,8 +35,11 @@ This capability allows users to perform federation queries, accessing data from | table.catalog-store.kind | string | generic_in_memory | The Catalog Store name, it should set to `gravitino`. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.metalake | string | (none) | The metalake name that flink connector used to request to Gravitino. | Yes | 0.6.0-incubating | | table.catalog-store.gravitino.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.6.0-incubating | +| table.catalog-store.gravitino.gravitino.allow.third-party-connector.list | string | (none) | List of allowed third-party connectors, separated by commas. | No | 1.3.0 | | table.catalog-store.gravitino.gravitino.client. | string | (none) | The configuration key prefix for the Gravitino client config. | No | 1.0.0 | +The `table.catalog-store.gravitino.gravitino.allow.third-party-connector.list` configuration allows you to specify a comma-separated list of catalog types. Catalogs created with these types will be stored in Flink's in-memory catalog store and will not be managed by Gravitino. This is useful for using standard Flink connectors (like `jdbc`, `hive`) that you do not wish to be governed by Gravitino. + To configure the Gravitino client, use properties prefixed with `table.catalog-store.gravitino.gravitino.client.`. These properties will be passed to the Gravitino client after removing the `table.catalog-store.gravitino.` prefix. **Example:** Setting `table.catalog-store.gravitino.gravitino.client.socketTimeoutMs` is equivalent to setting `gravitino.client.socketTimeoutMs` for the Gravitino client. @@ -48,6 +51,7 @@ Set the flink configuration in flink-conf.yaml. table.catalog-store.kind: gravitino table.catalog-store.gravitino.gravitino.metalake: metalake_demo table.catalog-store.gravitino.gravitino.uri: http://localhost:8090 +table.catalog-store.gravitino.gravitino.allow.third-party-connector.list: hive,jdbc table.catalog-store.gravitino.gravitino.client.socketTimeoutMs: 60000 table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs: 60000 ``` @@ -57,6 +61,7 @@ final Configuration configuration = new Configuration(); configuration.setString("table.catalog-store.kind", "gravitino"); configuration.setString("table.catalog-store.gravitino.gravitino.metalake", "metalake_demo"); configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://localhost:8090"); +configuration.setString("table.catalog-store.gravitino.gravitino.allow.third-party-connector.list", "hive,jdbc"); configuration.setString("table.catalog-store.gravitino.gravitino.client.socketTimeoutMs", "60000"); configuration.setString("table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs", "60000"); EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().withConfiguration(configuration); diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java index 89d85120e68..47364979b01 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java @@ -19,7 +19,9 @@ package org.apache.gravitino.flink.connector.store; +import com.google.common.collect.ImmutableList; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -31,6 +33,7 @@ import org.apache.flink.table.catalog.AbstractCatalogStore; import org.apache.flink.table.catalog.CatalogDescriptor; import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.factories.Factory; import org.apache.flink.util.Preconditions; @@ -45,26 +48,43 @@ /** GravitinoCatalogStore is used to store catalog information to Apache Gravitino server. */ public class GravitinoCatalogStore extends AbstractCatalogStore { private static final Logger LOG = LoggerFactory.getLogger(GravitinoCatalogStore.class); + private final GenericInMemoryCatalogStore memoryCatalogStore; private final GravitinoCatalogManager gravitinoCatalogManager; + private final ImmutableList allowThirdPartyConnectors; - public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) { - this.gravitinoCatalogManager = catalogManager; + public GravitinoCatalogStore( + GravitinoCatalogManager catalogManager, + GenericInMemoryCatalogStore memoryCatalogStore, + List allowThirdPartyConnectors) { + this.gravitinoCatalogManager = + Preconditions.checkNotNull(catalogManager, "CatalogManager cannot be null"); + this.memoryCatalogStore = + Preconditions.checkNotNull(memoryCatalogStore, "MemoryCatalogStore cannot be null"); + this.allowThirdPartyConnectors = + ImmutableList.copyOf( + Preconditions.checkNotNull( + allowThirdPartyConnectors, "AllowThirdPartyConnectors cannot be null")); } @Override public void storeCatalog(String catalogName, CatalogDescriptor descriptor) throws CatalogException { - Configuration configuration = descriptor.getConfiguration(); - Map gravitino = configuration.toMap(); - BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino); - Map gravitinoProperties = - catalogFactory.catalogPropertiesConverter().toGravitinoCatalogProperties(configuration); - gravitinoCatalogManager.createCatalog( - catalogName, - catalogFactory.gravitinoCatalogType(), - null, - catalogFactory.gravitinoCatalogProvider(), - gravitinoProperties); + String catalogType = descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE); + if (isAllowThirdPartyConnector(catalogType)) { + memoryCatalogStore.storeCatalog(catalogName, descriptor); + } else { + Configuration configuration = descriptor.getConfiguration(); + Map gravitino = configuration.toMap(); + BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino); + Map gravitinoProperties = + catalogFactory.catalogPropertiesConverter().toGravitinoCatalogProperties(configuration); + gravitinoCatalogManager.createCatalog( + catalogName, + catalogFactory.gravitinoCatalogType(), + null, + catalogFactory.gravitinoCatalogProvider(), + gravitinoProperties); + } } /** * Removes the specified catalog. @@ -75,13 +95,19 @@ public void storeCatalog(String catalogName, CatalogDescriptor descriptor) */ @Override public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException { - try { - boolean isDropped = gravitinoCatalogManager.dropCatalog(catalogName); - if (!ignoreIfNotExists && !isDropped) { - throw new CatalogException(String.format("Failed to remove the catalog: %s", catalogName)); + if (memoryCatalogStore.contains(catalogName)) { + memoryCatalogStore.removeCatalog(catalogName, ignoreIfNotExists); + } else { + try { + boolean isDropped = gravitinoCatalogManager.dropCatalog(catalogName); + if (!ignoreIfNotExists && !isDropped) { + throw new CatalogException( + String.format("Failed to remove the catalog: %s", catalogName)); + } + } catch (Exception e) { + throw new CatalogException( + String.format("Failed to remove the catalog: %s", catalogName), e); } - } catch (Exception e) { - throw new CatalogException(String.format("Failed to remove the catalog: %s", catalogName), e); } } @@ -94,6 +120,12 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws */ @Override public Optional getCatalog(String catalogName) throws CatalogException { + if (memoryCatalogStore.contains(catalogName)) { + Optional descriptor = memoryCatalogStore.getCatalog(catalogName); + if (descriptor.isPresent()) { + return descriptor; + } + } try { Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName); BaseCatalogFactory catalogFactory = getCatalogFactory(catalog.provider()); @@ -113,16 +145,20 @@ public Optional getCatalog(String catalogName) throws Catalog @Override public Set listCatalogs() throws CatalogException { + Set catalogs = new HashSet<>(); + catalogs.addAll(memoryCatalogStore.listCatalogs()); try { - return gravitinoCatalogManager.listCatalogs(); + catalogs.addAll(gravitinoCatalogManager.listCatalogs()); } catch (Exception e) { throw new CatalogException("Failed to list catalog.", e); } + return catalogs; } @Override public boolean contains(String catalogName) throws CatalogException { - return gravitinoCatalogManager.contains(catalogName); + return memoryCatalogStore.contains(catalogName) + || gravitinoCatalogManager.contains(catalogName); } private BaseCatalogFactory getCatalogFactory(Map configuration) { @@ -176,4 +212,8 @@ private BaseCatalogFactory discoverFactories(Predicate predicate, Strin } return (BaseCatalogFactory) factories.get(0); } + + private boolean isAllowThirdPartyConnector(String type) { + return allowThirdPartyConnectors.contains(type); + } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java index b59b9cfc94f..187f3fa0394 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java @@ -21,6 +21,7 @@ import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper; import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO; +import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG; import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_CLIENT_CONFIG; import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_METALAKE; import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_URI; @@ -28,29 +29,40 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.CatalogStore; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.gravitino.client.GravitinoClientConfiguration; import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager; +import org.apache.gravitino.flink.connector.utils.FactoryUtils; /** The Factory for creating {@link GravitinoCatalogStore}. */ public class GravitinoCatalogStoreFactory implements CatalogStoreFactory { + private GravitinoCatalogManager catalogManager; + private GenericInMemoryCatalogStore memoryCatalogStore; + private List allowThirdPartyConnectors; @Override public CatalogStore createCatalogStore() { - return new GravitinoCatalogStore(catalogManager); + return new GravitinoCatalogStore(catalogManager, memoryCatalogStore, allowThirdPartyConnectors); } @Override public void open(Context context) throws CatalogException { + this.memoryCatalogStore = new GenericInMemoryCatalogStore(); + this.memoryCatalogStore.open(); + FactoryUtil.FactoryHelper factoryHelper = createCatalogStoreFactoryHelper(this, context); factoryHelper.validate(); @@ -64,6 +76,21 @@ public void open(Context context) throws CatalogException { "Both %s and %s must be set", GRAVITINO_URI.key(), GRAVITINO_METALAKE.key()); + allowThirdPartyConnectors = + Arrays.asList( + Optional.ofNullable(options.get(GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG)) + .map(s -> s.split(",")) + .orElse(new String[] {})) + .stream() + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + + Preconditions.checkArgument( + allowThirdPartyConnectors.stream().noneMatch(FactoryUtils.GRAVITINO_FACTORY_LIST::contains), + "The allowed third party connectors %s should not contain Gravitino connectors %s.", + allowThirdPartyConnectors, + FactoryUtils.GRAVITINO_FACTORY_LIST); this.catalogManager = GravitinoCatalogManager.create(gravitinoUri, gravitinoName, extractClientConfig(options)); @@ -71,6 +98,9 @@ public void open(Context context) throws CatalogException { @Override public void close() throws CatalogException { + if (memoryCatalogStore != null) { + memoryCatalogStore.close(); + } if (catalogManager != null) { catalogManager.close(); } @@ -88,7 +118,8 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { - return ImmutableSet.of(GRAVITINO_CLIENT_CONFIG); + return ImmutableSet.of( + GRAVITINO_CLIENT_CONFIG, GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG); } @VisibleForTesting diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java index cda42978817..7787aebc631 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java @@ -40,7 +40,12 @@ private GravitinoCatalogStoreFactoryOptions() {} .stringType() .noDefaultValue() .withDescription("The name of Gravitino metalake"); - + public static final ConfigOption GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG = + ConfigOptions.key("gravitino.allow.third-party-connector.list") + .stringType() + .noDefaultValue() + .withDescription( + "Comma-separated list of allowed third-party catalog types (handled by Flink's in-memory catalog store)"); public static final ConfigOption> GRAVITINO_CLIENT_CONFIG = ConfigOptions.key("gravitino.client") .mapType() diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java index efb04a9ab4e..01436a9e1c6 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java @@ -21,12 +21,17 @@ import static org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions; import static org.apache.flink.table.factories.FactoryUtil.validateWatermarkOptions; +import com.google.common.collect.ImmutableList; import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions; +import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions; +import org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalogFactoryOptions; +import org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactoryOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +41,16 @@ private FactoryUtils() {} private static final Logger LOG = LoggerFactory.getLogger(FactoryUtils.class); + /** The list of Gravitino catalog factory identifiers. */ + public static final ImmutableList GRAVITINO_FACTORY_LIST = + ImmutableList.builder() + .add(GravitinoHiveCatalogFactoryOptions.IDENTIFIER) + .add(GravitinoIcebergCatalogFactoryOptions.IDENTIFIER) + .add(GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER) + .add(GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER) + .add(GravitinoPaimonCatalogFactoryOptions.IDENTIFIER) + .build(); + /** * Utility for working with {@link Factory}s. The {@link GravitinoCatalogFactoryHelper} override * the {@link FactoryUtil.CatalogFactoryHelper#validate()} method to validate the options. For the diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStore.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStore.java index 228d03e5e9d..b749a9d393a 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStore.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStore.java @@ -18,25 +18,51 @@ */ package org.apache.gravitino.flink.connector.store; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CatalogDescriptor; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStore; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager; +import org.junit.After; import org.junit.Before; import org.junit.Test; public class TestGravitinoCatalogStore { private GravitinoCatalogManager gravitinoCatalogMockManager; + private GenericInMemoryCatalogStore memoryCatalogStore; private GravitinoCatalogStore gravitinoCatalogStore; @Before public void setupCatalogStore() { gravitinoCatalogMockManager = mock(GravitinoCatalogManager.class); - gravitinoCatalogStore = new GravitinoCatalogStore(gravitinoCatalogMockManager); + memoryCatalogStore = new GenericInMemoryCatalogStore(); + memoryCatalogStore.open(); + gravitinoCatalogStore = + new GravitinoCatalogStore( + gravitinoCatalogMockManager, memoryCatalogStore, Arrays.asList("filesystem", "jdbc")); + } + + @After + public void closeCatalogStore() { + if (memoryCatalogStore != null) { + memoryCatalogStore.close(); + } } @Test @@ -94,4 +120,125 @@ public void testRemoveCatalog_UnexpectedException_shouldThrow() { } verify(gravitinoCatalogMockManager).dropCatalog(catalogName); } + + @Test + public void testStoreJdbcCatalog() { + String catalogName = "testJdbcCatalog"; + CatalogDescriptor descriptor = createJdbcCatalogDescriptor(catalogName); + + gravitinoCatalogStore.storeCatalog(catalogName, descriptor); + assertTrue(gravitinoCatalogStore.contains(catalogName)); + assertTrue(memoryCatalogStore.contains(catalogName)); + } + + @Test + public void testStoreHiveCatalogWithException() { + String catalogName = "testHiveCatalog"; + Configuration conf = new Configuration(); + conf.set(CommonCatalogOptions.CATALOG_TYPE, "hive"); + conf.setString("hive-conf-dir", "/tmp"); + CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf); + + try { + gravitinoCatalogStore.storeCatalog(catalogName, descriptor); + fail("Should throw RuntimeException"); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains("Failed to correctly match the Flink catalog factory.")); + } + } + + @Test + public void testConstructorWithNullCatalogManagerShouldThrow() { + try { + new GravitinoCatalogStore(null, memoryCatalogStore, Arrays.asList("filesystem", "jdbc")); + fail("Expected NullPointerException to be thrown"); + } catch (NullPointerException e) { + assertEquals("CatalogManager cannot be null", e.getMessage()); + } + } + + @Test + public void testConstructorWithNullMemoryCatalogStoreShouldThrow() { + try { + new GravitinoCatalogStore( + gravitinoCatalogMockManager, null, Arrays.asList("filesystem", "jdbc")); + fail("Expected NullPointerException to be thrown"); + } catch (NullPointerException e) { + assertEquals("MemoryCatalogStore cannot be null", e.getMessage()); + } + } + + @Test + public void testConstructorWithNullAllowThirdPartyConnectorsShouldThrow() { + try { + new GravitinoCatalogStore(gravitinoCatalogMockManager, memoryCatalogStore, null); + fail("Expected NullPointerException to be thrown"); + } catch (NullPointerException e) { + assertEquals("AllowThirdPartyConnectors cannot be null", e.getMessage()); + } + } + + @Test + public void testStoreCatalogShouldUseDefensiveCopyOfAllowedConnectors() { + List connectors = new ArrayList<>(Arrays.asList("filesystem", "jdbc")); + GravitinoCatalogStore catalogStore = + new GravitinoCatalogStore(gravitinoCatalogMockManager, memoryCatalogStore, connectors); + connectors.clear(); + + String catalogName = "testJdbcCatalogWithDefensiveCopy"; + CatalogDescriptor descriptor = createJdbcCatalogDescriptor(catalogName); + + catalogStore.storeCatalog(catalogName, descriptor); + assertTrue(catalogStore.contains(catalogName)); + assertTrue(memoryCatalogStore.contains(catalogName)); + } + + @Test + public void testRemoveCatalogWhenMemoryCatalogExistsShouldRemoveFromMemory() { + String catalogName = "testMemoryJdbcCatalog"; + CatalogDescriptor descriptor = createJdbcCatalogDescriptor(catalogName); + gravitinoCatalogStore.storeCatalog(catalogName, descriptor); + + gravitinoCatalogStore.removeCatalog(catalogName, false); + + assertFalse(memoryCatalogStore.contains(catalogName)); + verifyNoInteractions(gravitinoCatalogMockManager); + } + + @Test + public void testGetCatalogWhenMemoryCatalogExistsShouldReturnMemoryDescriptor() { + String catalogName = "testMemoryJdbcCatalog"; + CatalogDescriptor descriptor = createJdbcCatalogDescriptor(catalogName); + gravitinoCatalogStore.storeCatalog(catalogName, descriptor); + + Optional actualCatalog = gravitinoCatalogStore.getCatalog(catalogName); + + assertTrue(actualCatalog.isPresent()); + assertEquals( + descriptor.getConfiguration().toMap(), actualCatalog.get().getConfiguration().toMap()); + verifyNoInteractions(gravitinoCatalogMockManager); + } + + @Test + public void testListCatalogsShouldMergeMemoryAndGravitinoCatalogs() { + String memoryCatalogName = "testMemoryJdbcCatalog"; + CatalogDescriptor descriptor = createJdbcCatalogDescriptor(memoryCatalogName); + gravitinoCatalogStore.storeCatalog(memoryCatalogName, descriptor); + when(gravitinoCatalogMockManager.listCatalogs()) + .thenReturn(new HashSet<>(Arrays.asList(memoryCatalogName, "testHiveCatalog"))); + + Set catalogs = gravitinoCatalogStore.listCatalogs(); + + assertEquals(new HashSet<>(Arrays.asList(memoryCatalogName, "testHiveCatalog")), catalogs); + verify(gravitinoCatalogMockManager).listCatalogs(); + } + + private CatalogDescriptor createJdbcCatalogDescriptor(String catalogName) { + Configuration conf = new Configuration(); + conf.set(CommonCatalogOptions.CATALOG_TYPE, "jdbc"); + conf.setString("url", "jdbc:mysql://localhost:3306/test"); + conf.setString("username", "test"); + conf.setString("password", "test"); + return CatalogDescriptor.of(catalogName, conf); + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoFlinkConfig.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoFlinkConfig.java index 995b9774f69..8d7f604acf5 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoFlinkConfig.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoFlinkConfig.java @@ -28,6 +28,7 @@ import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.TableFactoryUtil; +import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -72,4 +73,68 @@ private Map extractGrivitinoClientConfig(Configuration configura ReadableConfig options = factoryHelper.getOptions(); return extractClientConfig(options); } + + @Test + void testGravitinoCatalogStoreFactory() { + final Configuration configuration = new Configuration(); + configuration.setString( + "table.catalog-store.kind", GravitinoCatalogStoreFactoryOptions.GRAVITINO); + configuration.setString("table.catalog-store.gravitino.gravitino.metalake", "flink"); + configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://127.0.0.1:8090"); + configuration.setString( + "table.catalog-store.gravitino.gravitino.allow.third-party-connector.list", + GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); + + CatalogStoreFactory.Context context = + TableFactoryUtil.buildCatalogStoreFactoryContext( + configuration, this.getClass().getClassLoader()); + GravitinoCatalogStoreFactory factory = new GravitinoCatalogStoreFactory(); + Exception e = + Assertions.assertThrows(IllegalArgumentException.class, () -> factory.open(context)); + Assertions.assertTrue( + e.getMessage() + .contains( + "The allowed third party connectors [gravitino-iceberg] should not contain Gravitino connectors")); + } + + @Test + void testGravitinoCatalogStoreFactoryWithAllowedTypeAndUnsupportedAuthType() { + final Configuration configuration = new Configuration(); + configuration.setString( + "table.catalog-store.kind", GravitinoCatalogStoreFactoryOptions.GRAVITINO); + configuration.setString("table.catalog-store.gravitino.gravitino.metalake", "flink"); + configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://127.0.0.1:8090"); + configuration.setString( + "table.catalog-store.gravitino.gravitino.allow.third-party-connector.list", "jdbc"); + + CatalogStoreFactory.Context context = + TableFactoryUtil.buildCatalogStoreFactoryContext( + configuration, this.getClass().getClassLoader()); + GravitinoCatalogStoreFactory factory = new GravitinoCatalogStoreFactory(); + + // It will throw exception because the Gravitino server is not running. + // But it should pass the validation. + Exception e = Assertions.assertThrows(Exception.class, () -> factory.open(context)); + Assertions.assertFalse(e.getMessage().contains("The allowed third party connectors")); + } + + @Test + void testGravitinoCatalogStoreFactoryWithUnsupportedAuthType() { + final Configuration configuration = new Configuration(); + configuration.setString( + "table.catalog-store.kind", GravitinoCatalogStoreFactoryOptions.GRAVITINO); + configuration.setString("table.catalog-store.gravitino.gravitino.metalake", "flink"); + configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://127.0.0.1:8090"); + configuration.setString( + "table.catalog-store.gravitino.gravitino.client.auth.type", "unsupported"); + + CatalogStoreFactory.Context context = + TableFactoryUtil.buildCatalogStoreFactoryContext( + configuration, this.getClass().getClassLoader()); + GravitinoCatalogStoreFactory factory = new GravitinoCatalogStoreFactory(); + + IllegalArgumentException e = + Assertions.assertThrows(IllegalArgumentException.class, () -> factory.open(context)); + Assertions.assertTrue(e.getMessage().contains("Unsupported auth type 'unsupported'")); + } }