improve(flink): Support create third-party connectors using gravitino connector#10560
improve(flink): Support create third-party connectors using gravitino connector#10560lasdf1234 wants to merge 11 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR extends the Gravitino Flink connector’s Catalog Store implementation to support mixing Gravitino-managed catalogs with selected “third-party” Flink catalog types by routing those catalogs to Flink’s default in-memory catalog store.
Changes:
- Add a new Flink configuration option to allow-list third-party catalog types to be handled by an in-memory catalog store.
- Update
GravitinoCatalogStore/ factory to delegate store/remove/get/list/contains operations to either Gravitino or the in-memory store based on catalog type. - Add unit tests and update Flink connector documentation to describe the new configuration.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java | Routes certain catalog types to GenericInMemoryCatalogStore and merges catalog visibility across both stores. |
| flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java | Parses allow-list config, initializes in-memory store, and validates that allow-list doesn’t include Gravitino connector identifiers. |
| flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java | Introduces a new config option for allowed third-party connector types. |
| flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java | Adds a centralized list of Gravitino catalog factory identifiers for validation. |
| flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoFlinkConfig.java | Adds validation-focused tests for the new allow-list option. |
| flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStore.java | Adds tests for storing a third-party catalog in the in-memory store. |
| docs/flink-connector/flink-connector.md | Documents the new allow-list configuration and provides examples. |
Comments suppressed due to low confidence (1)
flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactory.java:97
memoryCatalogStoreis opened before option validation and before creating theGravitinoCatalogManager. IffactoryHelper.validate()or the subsequent precondition checks /GravitinoCatalogManager.create(...)throw, the in-memory store will remain open and never be closed. Consider opening it only after all validations succeed, or ensure it is closed in acatch/finallywhenopen(...)fails.
public void open(Context context) throws CatalogException {
this.memoryCatalogStore = new GenericInMemoryCatalogStore();
this.memoryCatalogStore.open();
FactoryUtil.FactoryHelper<CatalogStoreFactory> factoryHelper =
createCatalogStoreFactoryHelper(this, context);
factoryHelper.validate();
ReadableConfig options = factoryHelper.getOptions();
String gravitinoUri = options.get(GRAVITINO_URI);
String gravitinoName = options.get(GRAVITINO_METALAKE);
Preconditions.checkArgument(
gravitinoUri != null && gravitinoName != null,
"Both %s and %s must be set",
GRAVITINO_URI.key(),
GRAVITINO_METALAKE.key());
allowThirdPartyConnectors =
Arrays.asList(
Optional.ofNullable(options.get(GRAVITINO_ALLOW_THIRD_PARTY_CONNECTOR_LIST_CONFIG))
.map(s -> s.split(","))
.orElse(new String[] {}))
.stream()
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
Preconditions.checkArgument(
allowThirdPartyConnectors.stream().noneMatch(FactoryUtils.gravitinoFactoryList::contains),
"The allowed third party connectors %s should not contain Gravitino connectors %s.",
allowThirdPartyConnectors,
FactoryUtils.gravitinoFactoryList);
this.catalogManager =
GravitinoCatalogManager.create(gravitinoUri, gravitinoName, extractClientConfig(options));
}
...k-connector/flink/src/main/java/org/apache/gravitino/flink/connector/utils/FactoryUtils.java
Show resolved
Hide resolved
...ain/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
Outdated
Show resolved
Hide resolved
...or/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
Show resolved
Hide resolved
...or/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java
Show resolved
Hide resolved
...link/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoCatalogStore.java
Show resolved
Hide resolved
...flink/src/test/java/org/apache/gravitino/flink/connector/store/TestGravitinoFlinkConfig.java
Show resolved
Hide resolved
…/connector/store/GravitinoCatalogStoreFactoryOptions.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
This PR needs a clearer user-facing semantic definition before we look at the implementation details. By introducing an in-memory catalog store alongside the existing Gravitino-backed store, this change is no longer only "allow third-party connectors". It also changes the semantics of catalog lifecycle operations from the user perspective. I think the PR description and docs should first clarify:
Right now, the implementation seems to introduce a mixed two-store model, so users need a precise contract for catalog visibility, persistence, and conflict handling. Without that, it is hard to evaluate whether the current behavior is correct or whether edge cases like duplicate names and shadowing are acceptable. |
|
@FANNG1 I have supplemented the PR. |
What changes were proposed in this pull request?
This PR introduces a mechanism to allow third-party connectors (such as standard Flink connectors like
jdbc,hive) to be used alongside Gravitino-managed catalogs in the Flink connector.Details are as follows:
1.When creating a catalog, if it is a built-in catalog of gravitino, it will be stored in gravitino. If it is a third-party catalog, it will be stored in memory.
2.Both GET/USE CATALOG will firstly retrieve the catalog from memory. If it doesn't exist in memory, flink will retrieve the catalog from gravitino.
3.DROP CATALOG will firstly delete the catalog from memory. If it doesn't exist in memory, it will delete the catalog from gravitino.
4.SHOW/LIST CATALOGS will retrieve the intersection of catalogs in memory and gravitino.
5.Third-party catalogs only session-scoped,they will disappear after restarting flink.
6.If the same catalog name exists in both memory and gravitino, flink will use the one in memory.
In summary, the catalog stored in memory has a higher priority than the catalog stored in gravitino.
Why are the changes needed?
Previously, the Gravitino Flink connector assumed all catalogs were Gravitino-managed. This prevented users from
using standard Flink connectors (e.g., for temporary catalogs not yet supported or governed by Gravitino)
within the same environment. This change provides flexibility to mix governed and ungoverned catalogs.
Fix: #10555
Does this PR introduce any user-facing change?
Yes. User can configure
table.catalog-store.gravitino.gravitino.allow.third-party-connector.listinflink-conf.yamlor viaTableEnvironmentto specify a comma-separated list of catalog types (e.g.,jdbc,hive) that should be handled by Flink's default in-memory store.How was this patch tested?
Existing ITS