Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/flink-connector/flink-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ This capability allows users to perform federation queries, accessing data from
| table.catalog-store.kind | string | generic_in_memory | The Catalog Store name, it should set to `gravitino`. | Yes | 0.6.0-incubating |
| table.catalog-store.gravitino.gravitino.metalake | string | (none) | The metalake name that flink connector used to request to Gravitino. | Yes | 0.6.0-incubating |
| table.catalog-store.gravitino.gravitino.uri | string | (none) | The uri of Gravitino server address. | Yes | 0.6.0-incubating |
| table.catalog-store.gravitino.gravitino.allow.third-party-connector.list | string | (none) | List of allowed third-party connectors, separated by commas. | No | 1.3.0 |
| table.catalog-store.gravitino.gravitino.client. | string | (none) | The configuration key prefix for the Gravitino client config. | No | 1.0.0 |

The `table.catalog-store.gravitino.gravitino.allow.third-party-connector.list` configuration allows you to specify a comma-separated list of catalog types. Catalogs created with these types will be stored in Flink's in-memory catalog store and will not be managed by Gravitino. This is useful for using standard Flink connectors (like `jdbc`, `hive`) that you do not wish to be governed by Gravitino.

To configure the Gravitino client, use properties prefixed with `table.catalog-store.gravitino.gravitino.client.`. These properties will be passed to the Gravitino client after removing the `table.catalog-store.gravitino.` prefix.

**Example:** Setting `table.catalog-store.gravitino.gravitino.client.socketTimeoutMs` is equivalent to setting `gravitino.client.socketTimeoutMs` for the Gravitino client.
Expand All @@ -48,6 +51,7 @@ Set the flink configuration in flink-conf.yaml.
table.catalog-store.kind: gravitino
table.catalog-store.gravitino.gravitino.metalake: metalake_demo
table.catalog-store.gravitino.gravitino.uri: http://localhost:8090
table.catalog-store.gravitino.gravitino.allow.third-party-connector.list: hive,jdbc
table.catalog-store.gravitino.gravitino.client.socketTimeoutMs: 60000
table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs: 60000
```
Expand All @@ -57,6 +61,7 @@ final Configuration configuration = new Configuration();
configuration.setString("table.catalog-store.kind", "gravitino");
configuration.setString("table.catalog-store.gravitino.gravitino.metalake", "metalake_demo");
configuration.setString("table.catalog-store.gravitino.gravitino.uri", "http://localhost:8090");
configuration.setString("table.catalog-store.gravitino.gravitino.allow.third-party-connector.list", "hive,jdbc");
configuration.setString("table.catalog-store.gravitino.gravitino.client.socketTimeoutMs", "60000");
configuration.setString("table.catalog-store.gravitino.gravitino.client.connectionTimeoutMs", "60000");
EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance().withConfiguration(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

package org.apache.gravitino.flink.connector.store;

import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +33,7 @@
import org.apache.flink.table.catalog.AbstractCatalogStore;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.util.Preconditions;
Expand All @@ -45,26 +48,43 @@
/** GravitinoCatalogStore is used to store catalog information to Apache Gravitino server. */
public class GravitinoCatalogStore extends AbstractCatalogStore {
private static final Logger LOG = LoggerFactory.getLogger(GravitinoCatalogStore.class);
private final GenericInMemoryCatalogStore memoryCatalogStore;
private final GravitinoCatalogManager gravitinoCatalogManager;
private final ImmutableList<String> allowThirdPartyConnectors;

public GravitinoCatalogStore(GravitinoCatalogManager catalogManager) {
this.gravitinoCatalogManager = catalogManager;
public GravitinoCatalogStore(
GravitinoCatalogManager catalogManager,
GenericInMemoryCatalogStore memoryCatalogStore,
List<String> allowThirdPartyConnectors) {
this.gravitinoCatalogManager =
Preconditions.checkNotNull(catalogManager, "CatalogManager cannot be null");
this.memoryCatalogStore =
Preconditions.checkNotNull(memoryCatalogStore, "MemoryCatalogStore cannot be null");
this.allowThirdPartyConnectors =
ImmutableList.copyOf(
Preconditions.checkNotNull(
allowThirdPartyConnectors, "AllowThirdPartyConnectors cannot be null"));
}

@Override
public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
throws CatalogException {
Configuration configuration = descriptor.getConfiguration();
Map<String, String> gravitino = configuration.toMap();
BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino);
Map<String, String> gravitinoProperties =
catalogFactory.catalogPropertiesConverter().toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(
catalogName,
catalogFactory.gravitinoCatalogType(),
null,
catalogFactory.gravitinoCatalogProvider(),
gravitinoProperties);
String catalogType = descriptor.getConfiguration().get(CommonCatalogOptions.CATALOG_TYPE);
if (isAllowThirdPartyConnector(catalogType)) {
memoryCatalogStore.storeCatalog(catalogName, descriptor);
} else {
Configuration configuration = descriptor.getConfiguration();
Map<String, String> gravitino = configuration.toMap();
BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino);
Map<String, String> gravitinoProperties =
catalogFactory.catalogPropertiesConverter().toGravitinoCatalogProperties(configuration);
gravitinoCatalogManager.createCatalog(
catalogName,
catalogFactory.gravitinoCatalogType(),
null,
catalogFactory.gravitinoCatalogProvider(),
gravitinoProperties);
}
}
/**
* Removes the specified catalog.
Expand All @@ -75,13 +95,19 @@ public void storeCatalog(String catalogName, CatalogDescriptor descriptor)
*/
@Override
public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws CatalogException {
try {
boolean isDropped = gravitinoCatalogManager.dropCatalog(catalogName);
if (!ignoreIfNotExists && !isDropped) {
throw new CatalogException(String.format("Failed to remove the catalog: %s", catalogName));
if (memoryCatalogStore.contains(catalogName)) {
memoryCatalogStore.removeCatalog(catalogName, ignoreIfNotExists);
} else {
try {
boolean isDropped = gravitinoCatalogManager.dropCatalog(catalogName);
if (!ignoreIfNotExists && !isDropped) {
throw new CatalogException(
String.format("Failed to remove the catalog: %s", catalogName));
}
} catch (Exception e) {
throw new CatalogException(
String.format("Failed to remove the catalog: %s", catalogName), e);
}
} catch (Exception e) {
throw new CatalogException(String.format("Failed to remove the catalog: %s", catalogName), e);
}
}

Expand All @@ -94,6 +120,12 @@ public void removeCatalog(String catalogName, boolean ignoreIfNotExists) throws
*/
@Override
public Optional<CatalogDescriptor> getCatalog(String catalogName) throws CatalogException {
if (memoryCatalogStore.contains(catalogName)) {
Optional<CatalogDescriptor> descriptor = memoryCatalogStore.getCatalog(catalogName);
if (descriptor.isPresent()) {
return descriptor;
}
}
try {
Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName);
BaseCatalogFactory catalogFactory = getCatalogFactory(catalog.provider());
Expand All @@ -113,16 +145,20 @@ public Optional<CatalogDescriptor> getCatalog(String catalogName) throws Catalog

@Override
public Set<String> listCatalogs() throws CatalogException {
Set<String> catalogs = new HashSet<>();
catalogs.addAll(memoryCatalogStore.listCatalogs());
try {
return gravitinoCatalogManager.listCatalogs();
catalogs.addAll(gravitinoCatalogManager.listCatalogs());
} catch (Exception e) {
throw new CatalogException("Failed to list catalog.", e);
}
return catalogs;
}

@Override
public boolean contains(String catalogName) throws CatalogException {
return gravitinoCatalogManager.contains(catalogName);
return memoryCatalogStore.contains(catalogName)
|| gravitinoCatalogManager.contains(catalogName);
}

private BaseCatalogFactory getCatalogFactory(Map<String, String> configuration) {
Expand Down Expand Up @@ -176,4 +212,8 @@ private BaseCatalogFactory discoverFactories(Predicate<Factory> predicate, Strin
}
return (BaseCatalogFactory) factories.get(0);
}

private boolean isAllowThirdPartyConnector(String type) {
return allowThirdPartyConnectors.contains(type);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,48 @@

import static org.apache.flink.table.factories.FactoryUtil.createCatalogStoreFactoryHelper;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_CLIENT_CONFIG;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_METALAKE;
import static org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions.GRAVITINO_URI;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.catalog.GenericInMemoryCatalogStore;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.client.GravitinoClientConfiguration;
import org.apache.gravitino.flink.connector.catalog.GravitinoCatalogManager;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;

/** The Factory for creating {@link GravitinoCatalogStore}. */
public class GravitinoCatalogStoreFactory implements CatalogStoreFactory {

private GravitinoCatalogManager catalogManager;
private GenericInMemoryCatalogStore memoryCatalogStore;
private List<String> allowThirdPartyConnectors;

@Override
public CatalogStore createCatalogStore() {
return new GravitinoCatalogStore(catalogManager);
return new GravitinoCatalogStore(catalogManager, memoryCatalogStore, allowThirdPartyConnectors);
}

@Override
public void open(Context context) throws CatalogException {
this.memoryCatalogStore = new GenericInMemoryCatalogStore();
this.memoryCatalogStore.open();

FactoryUtil.FactoryHelper<CatalogStoreFactory> factoryHelper =
createCatalogStoreFactoryHelper(this, context);
factoryHelper.validate();
Expand All @@ -64,13 +76,31 @@ public void open(Context context) throws CatalogException {
"Both %s and %s must be set",
GRAVITINO_URI.key(),
GRAVITINO_METALAKE.key());
allowThirdPartyConnectors =
Arrays.asList(
Optional.ofNullable(options.get(GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG))
.map(s -> s.split(","))
.orElse(new String[] {}))
.stream()
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());

Preconditions.checkArgument(
allowThirdPartyConnectors.stream().noneMatch(FactoryUtils.GRAVITINO_FACTORY_LIST::contains),
"The allowed third party connectors %s should not contain Gravitino connectors %s.",
allowThirdPartyConnectors,
FactoryUtils.GRAVITINO_FACTORY_LIST);

this.catalogManager =
GravitinoCatalogManager.create(gravitinoUri, gravitinoName, extractClientConfig(options));
}

@Override
public void close() throws CatalogException {
if (memoryCatalogStore != null) {
memoryCatalogStore.close();
}
if (catalogManager != null) {
catalogManager.close();
}
Expand All @@ -88,7 +118,8 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return ImmutableSet.of(GRAVITINO_CLIENT_CONFIG);
return ImmutableSet.of(
GRAVITINO_CLIENT_CONFIG, GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ private GravitinoCatalogStoreFactoryOptions() {}
.stringType()
.noDefaultValue()
.withDescription("The name of Gravitino metalake");

public static final ConfigOption<String> GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG =
ConfigOptions.key("gravitino.allow.third-party-connector.list")
.stringType()
.noDefaultValue()
.withDescription(
"Comma-separated list of allowed third-party catalog types (handled by Flink's in-memory catalog store)");
public static final ConfigOption<Map<String, String>> GRAVITINO_CLIENT_CONFIG =
ConfigOptions.key("gravitino.client")
.mapType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@
import static org.apache.flink.table.factories.FactoryUtil.validateFactoryOptions;
import static org.apache.flink.table.factories.FactoryUtil.validateWatermarkOptions;

import com.google.common.collect.ImmutableList;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.jdbc.GravitinoJdbcCatalogFactoryOptions;
import org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactoryOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,6 +41,16 @@ private FactoryUtils() {}

private static final Logger LOG = LoggerFactory.getLogger(FactoryUtils.class);

/** The list of Gravitino catalog factory identifiers. */
public static final ImmutableList<String> GRAVITINO_FACTORY_LIST =
ImmutableList.<String>builder()
.add(GravitinoHiveCatalogFactoryOptions.IDENTIFIER)
.add(GravitinoIcebergCatalogFactoryOptions.IDENTIFIER)
.add(GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER)
.add(GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER)
.add(GravitinoPaimonCatalogFactoryOptions.IDENTIFIER)
.build();

/**
* Utility for working with {@link Factory}s. The {@link GravitinoCatalogFactoryHelper} override
* the {@link FactoryUtil.CatalogFactoryHelper#validate()} method to validate the options. For the
Expand Down
Loading