Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/backend-integration-test-action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ jobs:
fi

./gradlew test -PskipTests -PtestMode=${{ inputs.test-mode }} -PjdbcBackend=${{ inputs.backend }} -PskipDockerTests=false \
-x :web:web:test -x :web:integration-test:test -x :web-v2:web:test -x :web-v2:integration-test:test -x :clients:client-python:test -x :flink-connector:flink:test -x :spark-connector:spark-common:test \
-x :web:web:test -x :web:integration-test:test -x :web-v2:web:test -x :web-v2:integration-test:test -x :clients:client-python:test \
-x :flink-connector:flink-common:test -x :flink-connector:flink-1.18:test -x :flink-connector:flink-1.19:test -x :flink-connector:flink-1.20:test \
-x :flink-connector:flink-runtime-1.18:test -x :flink-connector:flink-runtime-1.19:test -x :flink-connector:flink-runtime-1.20:test \
-x :spark-connector:spark-common:test \
-x :spark-connector:spark-3.3:test -x :spark-connector:spark-3.4:test -x :spark-connector:spark-3.5:test \
-x :spark-connector:spark-runtime-3.3:test -x :spark-connector:spark-runtime-3.4:test -x :spark-connector:spark-runtime-3.5:test \
-x :trino-connector:integration-test:test -x :trino-connector:trino-connector:test \
Expand Down
19 changes: 13 additions & 6 deletions .github/workflows/flink-integration-test-action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,13 @@ jobs:

- name: Flink Integration Test
id: integrationTest
# run embedded mode and deploy mode integration tests
run: |
./gradlew -PskipTests -PtestMode=embedded -PskipDockerTests=false :flink-connector:flink:test --tests "org.apache.gravitino.flink.connector.integration.test.**"
./gradlew -PskipTests -PtestMode=deploy -PskipDockerTests=false :flink-connector:flink:test --tests "org.apache.gravitino.flink.connector.integration.test.**"
./gradlew -PskipTests -PtestMode=embedded -PskipDockerTests=false :flink-connector:flink-1.18:test --tests "org.apache.gravitino.flink.connector.integration.test.**"
./gradlew -PskipTests -PtestMode=deploy -PskipDockerTests=false :flink-connector:flink-1.18:test --tests "org.apache.gravitino.flink.connector.integration.test.**"
dev/ci/run_flink_connector_smoke.sh 1.19 embedded
dev/ci/run_flink_connector_smoke.sh 1.19 deploy
dev/ci/run_flink_connector_smoke.sh 1.20 embedded
dev/ci/run_flink_connector_smoke.sh 1.20 deploy

- name: Upload integrate tests reports
uses: actions/upload-artifact@v4
Expand All @@ -58,9 +61,13 @@ jobs:
name: flink-connector-integrate-test-reports-${{ inputs.java-version }}
path: |
build/reports
flink-connector/flink/build/*.log
flink-connector/flink/build/*.tar
flink-connector/v1.18/flink/build/*.log
flink-connector/v1.18/flink/build/*.tar
flink-connector/v1.19/flink/build/*.log
flink-connector/v1.19/flink/build/*.tar
flink-connector/v1.20/flink/build/*.log
flink-connector/v1.20/flink/build/*.tar
distribution/package/logs/gravitino-server.out
distribution/package/logs/gravitino-server.log
catalogs/**/*.log
catalogs/**/*.tar
catalogs/**/*.tar
66 changes: 66 additions & 0 deletions dev/ci/run_flink_connector_smoke.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

set -euo pipefail

if [[ $# -ne 2 ]]; then
echo "Usage: $0 <1.18|1.19|1.20> <embedded|deploy>" >&2
exit 1
fi

readonly flink_version="$1"
readonly test_mode="$2"

case "$flink_version" in
1.18)
readonly test_task=":flink-connector:flink-1.18:test"
readonly smoke_test="org.apache.gravitino.flink.connector.integration.test.catalog.GravitinoCatalogManagerIT118"
;;
1.19)
readonly test_task=":flink-connector:flink-1.19:test"
readonly smoke_test="org.apache.gravitino.flink.connector.integration.test.catalog.GravitinoCatalogManagerIT119"
;;
1.20)
readonly test_task=":flink-connector:flink-1.20:test"
readonly smoke_test="org.apache.gravitino.flink.connector.integration.test.catalog.GravitinoCatalogManagerIT120"
;;
*)
echo "Unsupported Flink version: ${flink_version}" >&2
exit 1
;;
esac

case "$test_mode" in
embedded | deploy)
;;
*)
echo "Unsupported test mode: ${test_mode}" >&2
exit 1
;;
esac

export JAVA_TOOL_OPTIONS="${JAVA_TOOL_OPTIONS:+${JAVA_TOOL_OPTIONS} }-Dfile.encoding=UTF-8"

./gradlew \
"${test_task}" \
-PskipTests \
"-PtestMode=${test_mode}" \
-PskipDockerTests=false \
--tests "${smoke_test}" \
--console=plain
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,18 @@ repositories {
mavenCentral()
}

var paimonVersion: String = libs.versions.paimon.get()
val flinkVersion: String = libs.versions.flink.get()
val flinkVersion: String = libs.versions.flink18.get()
val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".")

val icebergVersion: String = libs.versions.iceberg4connector.get()
val icebergVersion: String = libs.versions.iceberg4flink18.get()
val paimonVersion: String = libs.versions.paimon4flink18.get()

// The Flink only support scala 2.12, and all scala api will be removed in a future version.
// You can find more detail at the following issues:
// https://issues.apache.org/jira/browse/FLINK-23986,
// https://issues.apache.org/jira/browse/FLINK-20845,
// https://issues.apache.org/jira/browse/FLINK-13414.
val scalaVersion: String = "2.12"
val artifactName = "${rootProject.name}-flink-${flinkMajorVersion}_$scalaVersion"
val artifactName = "${rootProject.name}-flink-common"

dependencies {
implementation(project(":catalogs:catalog-common")) {
Expand All @@ -47,13 +46,12 @@ dependencies {
implementation(libs.guava)

compileOnly(project(":clients:client-java-runtime", configuration = "shadow"))

compileOnly("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
compileOnly("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
compileOnly("org.apache.flink:flink-table-common:$flinkVersion")
compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion")
compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion")
compileOnly(libs.flinkjdbc)
compileOnly("org.apache.paimon:paimon-flink-$flinkMajorVersion:$paimonVersion")
compileOnly(libs.flinkjdbc18)

compileOnly(libs.hive2.exec) {
artifact {
Expand Down Expand Up @@ -96,7 +94,7 @@ dependencies {
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.metrics.core)
testImplementation(libs.flinkjdbc)
testImplementation(libs.flinkjdbc18)
testImplementation(libs.minikdc)

testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
Expand Down Expand Up @@ -182,7 +180,6 @@ dependencies {
tasks.test {
val skipITs = project.hasProperty("skipITs")
if (skipITs) {
// Exclude integration tests
exclude("**/integration/test/**")
} else {
dependsOn(tasks.jar)
Expand All @@ -194,10 +191,33 @@ tasks.withType<Jar> {
archiveBaseName.set(artifactName)
}

val testJar by tasks.registering(Jar::class) {
archiveClassifier.set("tests")
archiveBaseName.set(artifactName)
from(sourceSets["test"].output)
}

configurations {
create("testArtifacts")
}

artifacts {
add("testArtifacts", testJar)
}

publishing {
publications {
withType<MavenPublication>().configureEach {
artifactId = artifactName
}
}
}

tasks.clean {
delete("derby.log")
delete("metastore_db")
}

tasks.named<Jar>("sourcesJar") {
duplicatesStrategy = DuplicatesStrategy.EXCLUDE
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.SchemaAndTablePropertiesConverter;
import org.apache.gravitino.flink.connector.utils.CatalogCompat;
import org.apache.gravitino.flink.connector.utils.DefaultCatalogCompat;
import org.apache.gravitino.flink.connector.utils.TableUtils;
import org.apache.gravitino.flink.connector.utils.TypeUtils;
import org.apache.gravitino.rel.Column;
Expand Down Expand Up @@ -570,7 +572,21 @@ protected CatalogBaseTable toFlinkTable(Table table, ObjectPath tablePath) {
schemaAndTablePropertiesConverter.toFlinkTableProperties(
catalogOptions, table.properties(), tablePath);
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
return newCatalogTable(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
}

protected CatalogTable newCatalogTable(
org.apache.flink.table.api.Schema schema,
String comment,
List<String> partitionKeys,
Map<String, String> options) {
return catalogCompat().createCatalogTable(schema, comment, partitionKeys, options);
}

protected CatalogCompat catalogCompat() {
// Versioned catalog entry classes override this hook when the Flink minor has a different
// catalog/table API path.
return DefaultCatalogCompat.INSTANCE;
}

private static Optional<List<String>> getFlinkPrimaryKey(Table table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.utils.CatalogCompat;
import org.apache.gravitino.flink.connector.utils.DefaultCatalogCompat;
import org.apache.gravitino.rel.Table;

final class FlinkGenericTableUtil {
Expand Down Expand Up @@ -62,7 +65,12 @@ static boolean isGenericTableWhenLoad(Map<String, String> properties) {
}

static Map<String, String> toGravitinoGenericTableProperties(ResolvedCatalogTable resolvedTable) {
Map<String, String> properties = CatalogPropertiesUtil.serializeCatalogTable(resolvedTable);
return toGravitinoGenericTableProperties(resolvedTable, DefaultCatalogCompat.INSTANCE);
}

static Map<String, String> toGravitinoGenericTableProperties(
ResolvedCatalogTable resolvedTable, CatalogCompat catalogCompat) {
Map<String, String> properties = catalogCompat.serializeCatalogTable(resolvedTable);
if (!properties.containsKey(CONNECTOR)) {
properties.put(CONNECTOR, MANAGED_TABLE_IDENTIFIER);
}
Expand All @@ -72,6 +80,14 @@ static Map<String, String> toGravitinoGenericTableProperties(ResolvedCatalogTabl
}

static CatalogTable toFlinkGenericTable(Table table) {
return toFlinkGenericTable(table, DefaultCatalogCompat.INSTANCE);
}

static CatalogTable toFlinkGenericTable(Table table, CatalogCompat catalogCompat) {
return toFlinkGenericTable(table, catalogCompat::createCatalogTable);
}

static CatalogTable toFlinkGenericTable(Table table, CatalogTableBuilder tableBuilder) {
Map<String, String> flinkProperties = unmaskFlinkProperties(table.properties());
CatalogTable catalogTable = CatalogPropertiesUtil.deserializeCatalogTable(flinkProperties);
if (catalogTable.getUnresolvedSchema().getColumns().isEmpty()) {
Expand All @@ -82,13 +98,22 @@ static CatalogTable toFlinkGenericTable(Table table) {
if (MANAGED_TABLE_IDENTIFIER.equalsIgnoreCase(options.get(CONNECTOR))) {
options.remove(CONNECTOR);
}
return CatalogTable.of(
return tableBuilder.create(
catalogTable.getUnresolvedSchema(),
catalogTable.getComment(),
catalogTable.getPartitionKeys(),
options);
}

@FunctionalInterface
interface CatalogTableBuilder {
CatalogTable create(
org.apache.flink.table.api.Schema schema,
String comment,
List<String> partitionKeys,
Map<String, String> options);
}

private static String getConnectorFromProperties(Map<String, String> properties) {
String connector = properties.get(CatalogPropertiesUtil.FLINK_PROPERTY_PREFIX + CONNECTOR);
if (connector == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPropertiesUtil;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
Expand Down Expand Up @@ -111,8 +112,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig

NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName());
Map<String, String> properties =
FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable);
Map<String, String> properties = toGravitinoGenericTableProperties(resolvedTable);

try {
catalog()
Expand Down Expand Up @@ -146,7 +146,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
.asTableCatalog()
.loadTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
if (FlinkGenericTableUtil.isGenericTableWhenLoad(table.properties())) {
return FlinkGenericTableUtil.toFlinkGenericTable(table);
return toFlinkGenericTable(table);
}
return super.toFlinkTable(table, tablePath);
} catch (NoSuchTableException e) {
Expand Down Expand Up @@ -218,8 +218,7 @@ private void applyGenericTableAlter(
throws TableNotExistException, CatalogException {
NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName());
Map<String, String> updatedProperties =
FlinkGenericTableUtil.toGravitinoGenericTableProperties(newTable);
Map<String, String> updatedProperties = toGravitinoGenericTableProperties(newTable);
Map<String, String> currentProperties =
existingTable.properties() == null ? Collections.emptyMap() : existingTable.properties();

Expand Down Expand Up @@ -252,4 +251,13 @@ private void applyGenericTableAlter(
throw new CatalogException(e);
}
}

protected Map<String, String> toGravitinoGenericTableProperties(
ResolvedCatalogTable resolvedTable) {
return FlinkGenericTableUtil.toGravitinoGenericTableProperties(resolvedTable, catalogCompat());
}

protected CatalogTable toFlinkGenericTable(Table table) {
return FlinkGenericTableUtil.toFlinkGenericTable(table, catalogCompat());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Catalog createCatalog(Context context) {
PropertyUtils.getHadoopAndHiveProperties(context.getOptions()).forEach(hiveConf::set);
SchemaAndTablePropertiesConverter tablePropertiesConverter =
new HiveSchemaAndTablePropertiesConverter(hiveConf);
return new GravitinoHiveCatalog(
return newCatalog(
context.getName(),
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
context.getOptions(),
Expand All @@ -73,6 +73,24 @@ public Catalog createCatalog(Context context) {
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
}

protected Catalog newCatalog(
String catalogName,
String defaultDatabase,
java.util.Map<String, String> catalogOptions,
SchemaAndTablePropertiesConverter schemaAndTablePropertiesConverter,
PartitionConverter partitionConverter,
@javax.annotation.Nullable HiveConf hiveConf,
@javax.annotation.Nullable String hiveVersion) {
return new GravitinoHiveCatalog(
catalogName,
defaultDatabase,
catalogOptions,
schemaAndTablePropertiesConverter,
partitionConverter,
hiveConf,
hiveVersion);
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
Expand Down
Loading
Loading