Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/flink-connector/flink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ This capability allows users to perform federation queries, accessing data from
| table.catalog-store.kind | string | generic_in_memory | The Catalog Store name, it should set to `gravitino`. | Yes | 0.6.0-incubating |
| table.catalog-store.gravitino.gravitino.metalake | string | (none) | The metalake name that flink connector used to request to Gravitino. | Yes | 0.6.0-incubating |
| table.catalog-store.gravitino.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.6.0-incubating |
| table.catalog-store.gravitino.gravitino.allow.third-party-connector.list | string | (none) | List of allowed third-party connectors, separated by commas. | No | 1.3.0 |
| table.catalog-store.gravitino.gravitino.client. | string | (none) | The configuration key prefix for the Gravitino client config. | No | 1.0.0 |

The `table.catalog-store.gravitino.gravitino.allow.third-party-connector.list` configuration allows you to specify a comma-separated list of catalog types. Catalogs created with these types will be stored in Flink's in-memory catalog store and will not be managed by Gravitino. This is useful for using standard Flink connectors (like `jdbc`, `hive`) that you do not wish to be governed by Gravitino.

To configure the Gravitino client, use properties prefixed with `table.catalog-store.gravitino.gravitino.client.`. These properties will be passed to the Gravitino client after removing the `table.catalog-store.gravitino.` prefix.

**Example:** Setting `table.catalog-store.gravitino.gravitino.client.socketTimeoutMs` is equivalent to setting `gravitino.client.socketTimeoutMs` for the Gravitino client.
Expand All @@ -48,6 +51,7 @@ Set the flink configuration in flink-conf.yaml.
table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: metalake_demo
table.catalog-store.gravitino.gravitino.uri: http://localhost:8090
table.catalog-store.gravitino.gravitino.allow.third-party-connector.list: hive,jdbc
table.catalog-store.gravitino.gravitino.client.socketTimeoutMs: 60000
table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs: 60000
```
Expand All @@ -57,6 +61,7 @@ final Configuration configuration = new Configuration();
configuration.setString("table.catalog-store.kind", "gravitino");
configuration.setString("table.catalog-store.gravitino.gravitino.metalake", "metalake_demo");
configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://localhost:8090");
configuration.setString("table.catalog-store.gravitino.gravitino.allow.third-party-connector.list", "hive,jdbc");
configuration.setString("table.catalog-store.gravitino.gravitino.client.socketTimeoutMs", "60000");
configuration.setString("table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs", "60000");
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().withConfiguration(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.gravitino.flink.connector.store;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.table.catalog.AbstractCatalogStore;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.Preconditions;
Expand All @@ -45,26 +47,38 @@
/** GravitinoCatalogStore is used to store catalog information to Apache Gravitino server. */
public class GravitinoCatalogStore extends AbstractCatalogStore {
private static final Logger LOG = LoggerFactory.getLogger(GravitinoCatalogStore.class);
private final GenericInMemoryCatalogStore memoryCatalogStore;
private final GravitinoCatalogManager gravitinoCatalogManager;
private List<String> allowThirdPartyConnectors;

public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) {
public GravitinoCatalogStore(
GravitinoCatalogManager catalogManager,
GenericInMemoryCatalogStore memoryCatalogStore,
List<String> allowThirdPartyConnectors) {
this.gravitinoCatalogManager = catalogManager;
this.memoryCatalogStore = memoryCatalogStore;
this.allowThirdPartyConnectors = allowThirdPartyConnectors;
}

@Override
public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
throws CatalogException {
Configuration configuration = descriptor.getConfiguration();
Map<String, String> gravitino = configuration.toMap();
BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino);
Map<String, String> gravitinoProperties =
catalogFactory.catalogPropertiesConverter().toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(
catalogName,
catalogFactory.gravitinoCatalogType(),
null,
catalogFactory.gravitinoCatalogProvider(),
gravitinoProperties);
String catalogType = descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE);
if (isAllowThirdPartyConnector(catalogType)) {
memoryCatalogStore.storeCatalog(catalogName, descriptor);
} else {
Configuration configuration = descriptor.getConfiguration();
Map<String, String> gravitino = configuration.toMap();
BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino);
Map<String, String> gravitinoProperties =
catalogFactory.catalogPropertiesConverter().toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(
catalogName,
catalogFactory.gravitinoCatalogType(),
null,
catalogFactory.gravitinoCatalogProvider(),
gravitinoProperties);
}
}
/**
* Removes the specified catalog.
Expand All @@ -75,13 +89,19 @@ public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
*/
@Override
public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException {
try {
boolean isDropped = gravitinoCatalogManager.dropCatalog(catalogName);
if (!ignoreIfNotExists && !isDropped) {
throw new CatalogException(String.format("Failed to remove the catalog: %s", catalogName));
if (memoryCatalogStore.contains(catalogName)) {
memoryCatalogStore.removeCatalog(catalogName, ignoreIfNotExists);
} else {
try {
boolean isDropped = gravitinoCatalogManager.dropCatalog(catalogName);
if (!ignoreIfNotExists && !isDropped) {
throw new CatalogException(
String.format("Failed to remove the catalog: %s", catalogName));
}
} catch (Exception e) {
throw new CatalogException(
String.format("Failed to remove the catalog: %s", catalogName), e);
}
} catch (Exception e) {
throw new CatalogException(String.format("Failed to remove the catalog: %s", catalogName), e);
}
}

Expand All @@ -94,6 +114,12 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws
*/
@Override
public Optional<CatalogDescriptor> getCatalog(String catalogName) throws CatalogException {
if (memoryCatalogStore.contains(catalogName)) {
Optional<CatalogDescriptor> descriptor = memoryCatalogStore.getCatalog(catalogName);
if (descriptor.isPresent()) {
return descriptor;
}
}
try {
Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName);
BaseCatalogFactory catalogFactory = getCatalogFactory(catalog.provider());
Expand All @@ -113,16 +139,20 @@ public Optional<CatalogDescriptor> getCatalog(String catalogName) throws Catalog

@Override
public Set<String> listCatalogs() throws CatalogException {
Set<String> catalogs = new HashSet<>();
catalogs.addAll(memoryCatalogStore.listCatalogs());
try {
return gravitinoCatalogManager.listCatalogs();
catalogs.addAll(gravitinoCatalogManager.listCatalogs());
} catch (Exception e) {
throw new CatalogException("Failed to list catalog.", e);
}
return catalogs;
}

@Override
public boolean contains(String catalogName) throws CatalogException {
return gravitinoCatalogManager.contains(catalogName);
return memoryCatalogStore.contains(catalogName)
|| gravitinoCatalogManager.contains(catalogName);
}

private BaseCatalogFactory getCatalogFactory(Map<String, String> configuration) {
Expand Down Expand Up @@ -176,4 +206,8 @@ private BaseCatalogFactory discoverFactories(Predicate<Factory> predicate, Strin
}
return (BaseCatalogFactory) factories.get(0);
}

private boolean isAllowThirdPartyConnector(String type) {
return allowThirdPartyConnectors.contains(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,48 @@

import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_CLIENT_CONFIG;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_METALAKE;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_URI;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.client.GravitinoClientConfiguration;
import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;

/** The Factory for creating {@link GravitinoCatalogStore}. */
public class GravitinoCatalogStoreFactory implements CatalogStoreFactory {

private GravitinoCatalogManager catalogManager;
private GenericInMemoryCatalogStore memoryCatalogStore;
private List<String> allowThirdPartyConnectors;

@Override
public CatalogStore createCatalogStore() {
return new GravitinoCatalogStore(catalogManager);
return new GravitinoCatalogStore(catalogManager, memoryCatalogStore, allowThirdPartyConnectors);
}

@Override
public void open(Context context) throws CatalogException {
this.memoryCatalogStore = new GenericInMemoryCatalogStore();
this.memoryCatalogStore.open();

FactoryUtil.FactoryHelper<CatalogStoreFactory> factoryHelper =
createCatalogStoreFactoryHelper(this, context);
factoryHelper.validate();
Expand All @@ -64,13 +76,31 @@ public void open(Context context) throws CatalogException {
"Both %s and %s must be set",
GRAVITINO_URI.key(),
GRAVITINO_METALAKE.key());
allowThirdPartyConnectors =
Arrays.asList(
Optional.ofNullable(options.get(GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG))
.map(s -> s.split(","))
.orElse(new String[] {}))
.stream()
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());

Preconditions.checkArgument(
allowThirdPartyConnectors.stream().noneMatch(FactoryUtils.gravitinoFactoryList::contains),
"The allowed third party connectors %s should not contain Gravitino connectors %s.",
allowThirdPartyConnectors,
FactoryUtils.gravitinoFactoryList);

this.catalogManager =
GravitinoCatalogManager.create(gravitinoUri, gravitinoName, extractClientConfig(options));
}

@Override
public void close() throws CatalogException {
if (memoryCatalogStore != null) {
memoryCatalogStore.close();
}
if (catalogManager != null) {
catalogManager.close();
}
Expand All @@ -88,7 +118,8 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return ImmutableSet.of(GRAVITINO_CLIENT_CONFIG);
return ImmutableSet.of(
GRAVITINO_CLIENT_CONFIG, GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ private GravitinoCatalogStoreFactoryOptions() {}
.stringType()
.noDefaultValue()
.withDescription("The name of Gravitino metalake");

public static final ConfigOption<String> GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG =
ConfigOptions.key("gravitino.allow.third-party-connector.list")
.stringType()
.noDefaultValue()
.withDescription("List of allow third-party connector of Gravitino, separated by commas");
public static final ConfigOption<Map<String, String>> GRAVITINO_CLIENT_CONFIG =
ConfigOptions.key("gravitino.client")
.mapType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@
import static org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions;
import static org.apache.flink.table.factories.FactoryUtil.validateWatermarkOptions;

import com.google.common.collect.ImmutableList;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactoryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +41,16 @@ private FactoryUtils() {}

private static final Logger LOG = LoggerFactory.getLogger(FactoryUtils.class);

/** The list of Gravitino catalog factory identifiers. */
public static final ImmutableList<String> gravitinoFactoryList =
ImmutableList.<String>builder()
.add(GravitinoHiveCatalogFactoryOptions.IDENTIFIER)
.add(GravitinoIcebergCatalogFactoryOptions.IDENTIFIER)
.add(GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER)
.add(GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER)
.add(GravitinoPaimonCatalogFactoryOptions.IDENTIFIER)
.build();

/**
* Utility for working with {@link Factory}s. The {@link GravitinoCatalogFactoryHelper} override
* the {@link FactoryUtil.CatalogFactoryHelper#validate()} method to validate the options. For the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,29 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Arrays;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
import org.junit.Before;
import org.junit.Test;

public class TestGravitinoCatalogStore {
private GravitinoCatalogManager gravitinoCatalogMockManager;
private GenericInMemoryCatalogStore memoryCatalogStore;
private GravitinoCatalogStore gravitinoCatalogStore;

@Before
public void setupCatalogStore() {
gravitinoCatalogMockManager = mock(GravitinoCatalogManager.class);
gravitinoCatalogStore = new GravitinoCatalogStore(gravitinoCatalogMockManager);
memoryCatalogStore = new GenericInMemoryCatalogStore();
memoryCatalogStore.open();
gravitinoCatalogStore =
new GravitinoCatalogStore(
gravitinoCatalogMockManager, memoryCatalogStore, Arrays.asList("filesystem", "jdbc"));
}

@Test
Expand Down Expand Up @@ -94,4 +104,35 @@ public void testRemoveCatalog_UnexpectedException_shouldThrow() {
}
verify(gravitinoCatalogMockManager).dropCatalog(catalogName);
}

@Test
public void testStoreJdbcCatalog() {
String catalogName = "testJdbcCatalog";
Configuration conf = new Configuration();
conf.set(CommonCatalogOptions.CATALOG_TYPE, "jdbc");
conf.setString("url", "jdbc:mysql://localhost:3306/test");
conf.setString("username", "test");
conf.setString("password", "test");
CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf);

gravitinoCatalogStore.storeCatalog(catalogName, descriptor);
assertTrue(gravitinoCatalogStore.contains(catalogName));
assertTrue(memoryCatalogStore.contains(catalogName));
}

@Test
public void testStoreHiveCatalogWithException() {
String catalogName = "testHiveCatalog";
Configuration conf = new Configuration();
conf.set(CommonCatalogOptions.CATALOG_TYPE, "hive");
conf.setString("hive-conf-dir", "/tmp");
CatalogDescriptor descriptor = CatalogDescriptor.of(catalogName, conf);

try {
gravitinoCatalogStore.storeCatalog(catalogName, descriptor);
fail("Should throw RuntimeException");
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains("Failed to correctly match the Flink catalog factory."));
}
}
}
Loading
Loading