diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 2427fd4f01fa2..5779691b1769c 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -72,3 +72,20 @@ This change added tests and can be verified as follows: - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) + +--- + +##### Was generative AI tooling used to co-author this PR? + + + +- [ ] Yes (please specify the tool below) + + diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000000000..bd230dcfca7fb --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,335 @@ + + +# Flink AI Agent Instructions + +This file provides guidance for AI coding agents working with the Apache Flink codebase. + +## Prerequisites + +- Java 11, 17 (default), or 21. Java 11 syntax must be used in all modules. Java 17 syntax (records, sealed classes, pattern matching) is only permitted in the `flink-tests-java17` module. +- Maven 3.8.6 (Maven wrapper `./mvnw` included; prefer it) +- Git +- Unix-like environment (Linux, macOS, WSL, Cygwin) + +## Commands + +### Build + +- Fast dev build: `./mvnw clean install -DskipTests -Dfast -Pskip-webui-build -T1C` +- Full build (Java 17 default): `./mvnw clean package -DskipTests -Djdk17 -Pjava17-target` +- Java 11: `./mvnw clean package -DskipTests -Djdk11 -Pjava11-target` +- Java 21: `./mvnw clean package -DskipTests -Djdk21 -Pjava21-target` +- Full build with tests: `./mvnw clean verify` +- Single module: `./mvnw clean package -DskipTests -pl flink-core-api` +- Single module with tests: `./mvnw clean verify -pl flink-core-api` + +### Testing + +- Single test class: `./mvnw -pl flink-core-api -Dtest=MemorySizeTest test` +- Single test method: `./mvnw -pl flink-core-api -Dtest=MemorySizeTest#testParseBytes test` + +### Code Quality + +- Format code (Java + Scala): `./mvnw spotless:apply` +- Check formatting: `./mvnw spotless:check` +- Checkstyle: `./mvnw checkstyle:check -T1C` +- Checkstyle config: `tools/maven/checkstyle.xml` + +## Repository Structure + +Every module from the root pom.xml, organized by function. Flink provides three main user-facing APIs (recommended in this order: SQL, Table API, DataStream API) plus a newer DataStream v2 API. + +### Core Infrastructure + +- `flink-annotations` — Stability annotations (`@Public`, `@PublicEvolving`, `@Internal`, `@Experimental`) and `@VisibleForTesting` +- `flink-core-api` — Core API interfaces (functions, state, types) shared by all APIs +- `flink-core` — Core implementation (type system, serialization, memory management, configuration) +- `flink-runtime` — Distributed runtime (JobManager, TaskManager, scheduling, network, state) +- `flink-clients` — CLI and client-side job submission +- `flink-rpc/` — RPC framework + - `flink-rpc-core` — RPC interfaces + - `flink-rpc-akka`, `flink-rpc-akka-loader` — Pekko-based RPC implementation + +### SQL / Table API (recommended API for most users) + +- `flink-table/` + - `flink-sql-parser` — SQL parser (extends Calcite SQL parser) + - `flink-table-common` — Shared types, descriptors, catalog interfaces + - `flink-table-api-java` — Table API for Java + - `flink-table-api-scala` — Table API for Scala + - `flink-table-api-bridge-base`, `flink-table-api-java-bridge`, `flink-table-api-scala-bridge` — Bridges between Table and DataStream APIs + - `flink-table-api-java-uber` — Uber JAR for Table API + - `flink-table-planner` — SQL/Table query planning and optimization (Calcite-based) + - `flink-table-planner-loader`, `flink-table-planner-loader-bundle` — Classloader isolation for planner + - `flink-table-runtime` — Runtime operators for Table/SQL queries + - `flink-table-calcite-bridge` — Bridge to Apache Calcite + - `flink-sql-gateway-api`, `flink-sql-gateway` — SQL Gateway for remote SQL execution + - `flink-sql-client` — Interactive SQL CLI + - `flink-sql-jdbc-driver`, `flink-sql-jdbc-driver-bundle` — JDBC driver for SQL Gateway + - `flink-table-code-splitter` — Code generation utilities + - `flink-table-test-utils` — Test utilities for Table/SQL + +### DataStream API (original streaming API) + +- `flink-streaming-java` — DataStream API and stream processing operator implementations + +### DataStream API v2 (newer event-driven API) + +- `flink-datastream-api` — DataStream v2 API definitions +- `flink-datastream` — DataStream v2 API implementation + +### Connectors (in-tree) + +- `flink-connectors/` + - `flink-connector-base` — Base classes for source/sink connectors + - `flink-connector-files` — Unified file system source and sink + - `flink-connector-datagen` — DataGen source for testing + - `flink-connector-datagen-test` — Tests for DataGen connector + - `flink-hadoop-compatibility` — Hadoop InputFormat/OutputFormat compatibility + - `flink-file-sink-common` — Common file sink utilities +- Most connectors (Kafka, JDBC, Elasticsearch, etc.) live in separate repos under [github.com/apache](https://github.com/apache); see README.md for the full list + +### Formats + +- `flink-formats/` + - `flink-json`, `flink-csv`, `flink-avro`, `flink-parquet`, `flink-orc`, `flink-protobuf` — Serialization formats + - `flink-avro-confluent-registry` — Avro with Confluent Schema Registry + - `flink-sequence-file`, `flink-compress`, `flink-hadoop-bulk`, `flink-orc-nohive` — Hadoop-related formats + - `flink-format-common` — Shared format utilities + - `flink-sql-json`, `flink-sql-csv`, `flink-sql-avro`, `flink-sql-parquet`, `flink-sql-orc`, `flink-sql-protobuf` — SQL-layer format integrations + - `flink-sql-avro-confluent-registry` — SQL-layer Avro with Confluent Schema Registry + +### State Backends + +- `flink-state-backends/` + - `flink-statebackend-rocksdb` — RocksDB state backend + - `flink-statebackend-forst` — ForSt state backend (experimental; a fork of RocksDB) + - `flink-statebackend-heap-spillable` — Heap-based spillable state backend + - `flink-statebackend-changelog` — Changelog state backend + - `flink-statebackend-common` — Shared state backend utilities +- `flink-dstl/flink-dstl-dfs` — State changelog storage (DFS-based persistent changelog for incremental checkpointing) + +### File Systems + +- `flink-filesystems/` + - `flink-hadoop-fs` — Hadoop FileSystem abstraction + - `flink-s3-fs-hadoop`, `flink-s3-fs-presto`, `flink-s3-fs-base` — S3 file systems + - `flink-oss-fs-hadoop` — Alibaba OSS + - `flink-azure-fs-hadoop` — Azure Blob Storage + - `flink-gs-fs-hadoop` — Google Cloud Storage + - `flink-fs-hadoop-shaded` — Shaded Hadoop dependencies + +### Queryable State + +- `flink-queryable-state/` + - `flink-queryable-state-runtime` — Server-side queryable state service + - `flink-queryable-state-client-java` — Client for querying operator state from running jobs + +### Deployment + +- `flink-kubernetes` — Kubernetes integration +- `flink-yarn` — YARN integration +- `flink-dist`, `flink-dist-scala` — Distribution packaging +- `flink-container` — Container entry-point and utilities for containerized deployments + +### Metrics + +- `flink-metrics/` + - `flink-metrics-core` — Metrics API and core implementation + - Reporter implementations: `flink-metrics-jmx`, `flink-metrics-prometheus`, `flink-metrics-datadog`, `flink-metrics-statsd`, `flink-metrics-graphite`, `flink-metrics-influxdb`, `flink-metrics-slf4j`, `flink-metrics-dropwizard`, `flink-metrics-otel` + +### Libraries + +- `flink-libraries/` + - `flink-cep` — Complex Event Processing + - `flink-state-processing-api` — Offline state access (savepoint reading/writing) + +### Other + +- `flink-models` — AI model integration (sub-modules: `flink-model-openai`, `flink-model-triton`) +- `flink-python` — PyFlink (Python API) +- `flink-runtime-web` — Web UI for JobManager dashboard +- `flink-external-resources` — External resource management (e.g., GPU) +- `docs/` — Documentation content (Hugo site). This is where user-facing docs are written. +- `flink-docs` — Documentation build module (auto-generated config reference docs) +- `flink-examples` — Example programs +- `flink-quickstart` — Maven archetype for new projects +- `flink-walkthroughs` — Tutorial walkthrough projects + +### Testing + +- `flink-tests` — Integration tests +- `flink-end-to-end-tests` — End-to-end tests +- `flink-test-utils-parent` — Test utility classes +- `flink-yarn-tests` — YARN-specific tests +- `flink-fs-tests` — FileSystem tests +- `flink-architecture-tests` — ArchUnit architectural boundary tests +- `tools/ci/flink-ci-tools` — CI tooling + +## Architecture Boundaries + +1. **Client** submits jobs to the cluster. Submission paths include the CLI (`bin/flink run` via `flink-clients`), the SQL Client (`bin/sql-client.sh` via `flink-sql-client`), the SQL Gateway (`flink-sql-gateway`, also accessible via JDBC driver), the REST API (direct HTTP to JobManager), programmatic execution (`StreamExecutionEnvironment.execute()` or `TableEnvironment.executeSql()`), and PyFlink (`flink-python`, wraps the Java APIs). +2. **JobManager** (`flink-runtime`) orchestrates execution: receives jobs, creates the execution graph, manages scheduling, coordinates checkpoints, and handles failover. Never runs user code directly. +3. **TaskManager** (`flink-runtime`) executes the user's operators in task slots. Manages network buffers, state backends, and I/O. +4. **Table Planner** (`flink-table-planner`) translates SQL/Table API programs into DataStream programs. The planner is loaded in a separate classloader (`flink-table-planner-loader`) to isolate Calcite dependencies. +5. **Connectors** communicate with external systems. Source connectors implement the `Source` API (FLIP-27); sinks implement the `Sink` API (package `sink2`). Most connectors are externalized to separate repositories. +6. **State Backends** persist keyed state and operator state. RocksDB is the primary backend for production use. +7. **Checkpointing** provides exactly-once guarantees. The JobManager coordinates barriers through the data stream; TaskManagers snapshot local state to a distributed file system. + +Key separations: + +- **Planner vs Runtime:** The table planner generates code and execution plans; the runtime executes them. Changes to planning logic live in `flink-table-planner`; changes to runtime operators live in `flink-table-runtime` or `flink-streaming-java`. +- **API vs Implementation:** Public API surfaces (`flink-core-api`, `flink-datastream-api`, `flink-table-api-java`) are separate from implementation modules. API stability annotations control what users can depend on. +- **ArchUnit enforcement:** `flink-architecture-tests/` contains ArchUnit tests that enforce module boundaries. New violations should be avoided; if unavoidable, follow the freeze procedure in `flink-architecture-tests/README.md`. + +## Common Change Patterns + +This section maps common types of Flink changes to the modules they touch and the verification they require. + +### Adding a new SQL built-in function + +1. Register in `flink-table-common` in `BuiltInFunctionDefinitions.java` (definition, input/output type strategies, runtime class reference) +2. Implement in `flink-table-runtime` under `functions/` (extend the appropriate base class: `BuiltInScalarFunction`, `BuiltInTableFunction`, `BuiltInAggregateFunction`, or `BuiltInProcessTableFunction`) +3. Add tests in `flink-table-planner` and `flink-table-runtime` +4. Extend Table API support +5. Document in `docs/` +6. See [flink-table/flink-table-planner/AGENTS.md](flink-table/flink-table-planner/AGENTS.md) and [flink-table/flink-table-runtime/AGENTS.md](flink-table/flink-table-runtime/AGENTS.md) for detailed patterns + +### Adding a new configuration option + +1. Define `ConfigOption` in the relevant config class (e.g., `ExecutionConfigOptions.java` in `flink-table-api-java`) +2. Use `ConfigOptions.key("table.exec....")` builder with type, default value, and description +3. Add `@Documentation.TableOption` annotation for auto-generated docs +4. Document in `docs/` if user-facing +5. Verify: unit test for default value, ITCase for behavior change + +### Adding a new table operator (e.g., join type, aggregate) + +1. Involves `flink-table-runtime` (operator), `flink-table-planner` (ExecNode, physical/logical rules), and tests across both +2. See [flink-table/flink-table-planner/AGENTS.md](flink-table/flink-table-planner/AGENTS.md) and [flink-table/flink-table-runtime/AGENTS.md](flink-table/flink-table-runtime/AGENTS.md) for detailed development order and testing patterns + +### Adding a new connector (Source or Sink) + +1. Implement the `Source` API (`flink-connector-base`): `SplitEnumerator`, `SourceReader`, `SourceSplit`, serializers (`SimpleVersionedSerializer`) +2. Or implement the `Sink` API (package `sink2`) for sinks +3. Most new connectors go in separate repos under `github.com/apache`, not in the main Flink repo +4. Verify: unit tests + ITCase with real or embedded external system + +### Modifying state serializers + +1. Changes to `TypeSerializer` require a corresponding `TypeSerializerSnapshot` for migration +2. Bump version in `getCurrentVersion()`, handle old versions in `readSnapshot()` +3. Snapshot must have no-arg constructor for reflection-based deserialization +4. Implement `resolveSchemaCompatibility()` for upgrade paths +5. Verify: serializer snapshot migration tests, checkpoint restore tests across versions + +### Introducing or changing user-facing APIs (`@Public`, `@PublicEvolving`, `@Experimental`) + +1. New user-facing API requires a voted FLIP (Flink Improvement Proposal); this applies to `@Public`, `@PublicEvolving`, and `@Experimental` since users build against all three +2. Every user-facing API class and method must carry a stability annotation +3. Changes to existing `@Public` or `@PublicEvolving` API must maintain backward compatibility +4. `@Internal` APIs can be changed freely; users should not depend on them +5. Update JavaDoc on the changed class/method +6. Add to release notes +7. Verify: ArchUnit tests pass, no new architecture violations + +## Coding Standards + +- **Format Java files with Spotless immediately after editing:** `./mvnw spotless:apply`. Uses google-java-format with AOSP style. +- **Scala formatting:** Spotless + scalafmt (config at `.scalafmt.conf`, maxColumn 100). +- **Checkstyle:** `tools/maven/checkstyle.xml` (version defined in root `pom.xml` as `checkstyle.version`). Some modules (flink-core, flink-optimizer, flink-runtime) are not covered by checkstyle enforcement, but conventions should still be followed. +- **No new Scala code.** All Flink Scala APIs are deprecated per FLIP-265. Write all new code in Java. +- **Apache License 2.0 header** required on all new files (enforced by Apache Rat). Use an HTML comment for markdown files. +- **API stability annotations:** Every user-facing API class and method must have a stability annotation. `@Public` (stable across minor releases), `@PublicEvolving` (may change in minor releases), `@Experimental` (may change at any time). These are all part of the public API surface that users build against. `@Internal` marks APIs with no stability guarantees that users should not depend on. +- **Logging:** Use parameterized log statements (SLF4J `{}` placeholders), never string concatenation. +- **No Java serialization** for new features (except internal RPC message transport). +- **Use `final`** for variables and fields where applicable. +- **Comments:** Do not add unnecessary comments that restate what the code does. Add comments that explain "the why" where relevant. +- **Reuse existing code.** Before implementing new utilities or abstractions, search for existing ones in the codebase. Prioritize architecture consistency and code reusability. +- Full code style guide: https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/ + +## Testing Standards + +- Add tests for new behavior, covering success, failure, and edge cases. +- Use **JUnit 5** + **AssertJ** assertions. Do not use JUnit 4 or Hamcrest in new test code. +- Prefer real test implementations over Mockito mocks where possible. +- **Integration tests:** Name classes with `ITCase` suffix (e.g., `MyFeatureITCase.java`). +- **Red-green verification:** For bug fixes, verify that new tests actually fail without the fix before confirming they pass with it. +- **Test location** mirrors source structure within each module. +- Follow the testing conventions at https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing + +## Commits and PRs + +### Commit message format + +- `[FLINK-XXXX][component] Description` where FLINK-XXXX is the JIRA issue number +- `[hotfix][component] Description` for typo fixes without JIRA +- Each commit must have a meaningful message including the JIRA ID. If you don't know the ticket number, ask. +- Separate cleanup/refactoring from functional changes into distinct commits +- When AI tools were used: add `Generated-by: ` trailer per [ASF generative tooling guidance](https://www.apache.org/legal/generative-tooling.html) + +### Pull request conventions + +- Title format: `[FLINK-XXXX][component] Title of the pull request` +- A corresponding JIRA issue is required (except hotfixes for typos) +- Fill out the PR template completely but concisely: describe purpose, change log, testing approach, impact assessment +- Each PR should address exactly one issue +- Ensure `./mvnw clean verify` passes before opening a PR +- Always push to your fork, not directly to `apache/flink` +- Rebase onto the latest target branch before submitting + +### AI-assisted contributions + +- Disclose AI usage by checking the AI disclosure checkbox and uncommenting the `Generated-by` line in the PR template +- Add `Generated-by: ` to commit messages +- Never add `Co-Authored-By` with an AI agent as co-author; agents are assistants, not authors +- You must be able to explain every change and respond to review feedback substantively + +## Boundaries + +### Ask first + +- Adding or changing `@Public`, `@PublicEvolving`, or `@Experimental` annotations (these are user-facing API commitments requiring a FLIP) +- Large cross-module refactors +- New dependencies +- Changes to serialization formats (affects state compatibility) +- Changes to checkpoint/savepoint behavior +- Changes that could impact performance on hot paths (per-record processing, serialization, state access) + +### Never + +- Commit secrets, credentials, or tokens +- Push directly to `apache/flink`; always work from your fork +- Mix unrelated changes into one PR +- Use Java serialization for new features +- Edit generated files by hand when a generation workflow exists +- Use the legacy `SourceFunction` or `SinkFunction` interfaces for connectors; use the `Source` API (FLIP-27) and `Sink` API (package `sink2`) instead +- Add `Co-Authored-By` with an AI agent as co-author in commit messages; AI agents are assistants, not authors. Use `Generated-by: ` instead. +- Suppress or bypass checkstyle rules (no `CHECKSTYLE:ON`/`CHECKSTYLE:OFF` comments, no adding entries to `tools/maven/suppressions.xml`, no `@SuppressWarnings`). Fix the code to satisfy checkstyle instead. +- Use destructive git operations unless explicitly requested + +## References + +- [README.md](README.md) — Build instructions and project overview +- [DEVELOPMENT.md](DEVELOPMENT.md) — IDE setup and development environment +- [.github/CONTRIBUTING.md](.github/CONTRIBUTING.md) — Contribution process +- [.github/PULL_REQUEST_TEMPLATE.md](.github/PULL_REQUEST_TEMPLATE.md) — PR checklist +- [Code Style Guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-preamble/) — Detailed coding guidelines +- [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) — AI tooling policy diff --git a/flink-table/flink-table-planner/AGENTS.md b/flink-table/flink-table-planner/AGENTS.md new file mode 100644 index 0000000000000..9dd6ad389b17e --- /dev/null +++ b/flink-table/flink-table-planner/AGENTS.md @@ -0,0 +1,111 @@ + + +# flink-table-planner + +Translates and optimizes SQL/Table API programs into executable plans using Apache Calcite. Bridges the Table/SQL API and the runtime by generating code and execution plans. The planner is loaded in a separate classloader (`flink-table-planner-loader`) to isolate Calcite dependencies. + +See also [README.md](README.md) for Immutables rule config conventions and JSON plan test regeneration. + +## Build Commands + +Full table modules rebuild: + +``` +./mvnw clean install -T1C -DskipTests -Pskip-webui-build -pl flink-table/flink-table-common,flink-table/flink-sql-parser,flink-table/flink-table-planner-loader,flink-table/flink-table-planner,flink-table/flink-table-api-java -am +``` + +After the first full build, drop `-am` for faster rebuilds when you're only changing code within these modules. + +## Key Directory Structure + +- `plan/rules/physical/stream/` and `plan/rules/physical/batch/` — Physical planner rules +- `plan/rules/logical/` — Logical optimization rules +- `plan/nodes/exec/stream/` and `plan/nodes/exec/batch/` — ExecNodes (bridge between planner and runtime) +- `plan/nodes/exec/spec/` — Serializable operator specifications (JoinSpec, WindowSpec, etc.) +- `plan/nodes/physical/stream/` and `plan/nodes/physical/batch/` — Intermediate physical nodes (Calcite-based) +- `plan/nodes/logical/` — Logical nodes (Calcite-based) +- `codegen/` — Code generation +- `codegen/calls/` — Custom code generators for specific functions (e.g., `JsonCallGen.scala`) +- `functions/casting/` — Cast rules for code generation (e.g., `BinaryToBinaryCastRule`, `StringToTimeCastRule`) +- `functions/` — Function management and inference +- `catalog/` — Catalog integration + +## Key Abstractions + +- **ExecNode**: Bridge between planner and runtime. Annotated with `@ExecNodeMetadata(name, version, minPlanVersion, minStateVersion)` for versioning and backwards compatibility. Extends `ExecNodeBase`, implements `StreamExecNode`. +- **Physical rules**: Extend `RelRule`, use Immutables `@Value.Immutable` for config. Transform logical nodes to physical nodes. Registered in `FlinkStreamRuleSets` and/or `FlinkBatchRuleSets`. +- **Logical optimization rules**: Also extend `RelRule`, often use `RexShuttle` for expression rewriting. Registered in rule sets. +- **Specs**: Serializable specifications in `plan/nodes/exec/spec/` (JoinSpec, WindowSpec, etc.) that carry operator configuration. + +## Common Change Patterns + +### Adding a new table operator + +Components involved (can be developed top-down or bottom-up): + +1. **Runtime operator** in `flink-table-runtime` under `operators/` (extend `TableStreamOperator`, implement `OneInputStreamOperator` or `TwoInputStreamOperator`). Test with harness tests. See [flink-table-runtime AGENTS.md](../flink-table-runtime/AGENTS.md). +2. **ExecNode** in `plan/nodes/exec/stream/` and/or `plan/nodes/exec/batch/` (extend `ExecNodeBase`, implement `StreamExecNode`, annotate with `@ExecNodeMetadata`) +3. **Physical Node + Physical Rules** in `plan/rules/physical/stream/` and/or `plan/rules/physical/batch/` (extend `RelRule`, use `@Value.Immutable` config pattern) +4. **Logical Node + Planner rule** +5. Tests: semantic tests, plan tests, restore tests (if stateful) + +Both `stream/` and `batch/` directories exist for rules and ExecNodes. Consider whether your change applies to one or both. + +### Adding a planner optimization rule + +1. Create rule class extending `RelRule` with `@Value.Immutable` config +2. Register in `FlinkStreamRuleSets.scala` and/or `FlinkBatchRuleSets.scala` +3. Test with plan tests using XML golden files (before/after comparison) +4. No runtime changes needed (optimization is compile-time only) + +### Extending SQL syntax + +1. Modify parser grammar in `flink-sql-parser` (`parserImpls.ftl`) +2. Add operation conversion logic in `SqlNodeToOperationConversion.java` +3. Test with parser tests and SQL gateway integration tests (`.q` files) + +### Code generation changes + +- Cast rules live in `functions/casting/`. Each extends `AbstractExpressionCodeGeneratorCastRule` or similar. +- Custom call generators for functions live in `codegen/calls/` (e.g., `JsonCallGen.scala`). Simple scalar functions typically don't need these; the planner handles them uniformly through the function definition. +- Immutables library is used for rule configs (`@Value.Immutable`, `@Value.Enclosing`). See [README.md](README.md). + +### Plan serialization changes + +- ExecNode specs use Jackson for JSON serialization. Source/sink specs should use `@JsonIgnoreProperties(ignoreUnknown = true)` for forward compatibility. +- When adding new ExecNode features, update `RexNodeJsonDeserializer` or related serde classes if new function kinds or types are introduced. + +### ExecNode versioning + +When bumping an ExecNode version, update the `@ExecNodeMetadata` annotation's `version` and `minPlanVersion`/`minStateVersion` fields. Add restore test snapshots for the new version. + +### Configuration options + +New features often introduce `ExecutionConfigOptions` entries (in `flink-table-api-java`) for runtime tunability (e.g., cache sizes, timeouts, batch sizes). + +## Testing Patterns + +Choose test types based on what you're changing: + +- **Semantic tests** (for ExecNode/operator changes): Use `SemanticTestBase` (streaming) or `BatchSemanticTestBase` (batch) in `plan/nodes/exec/testutils/`. Extends `CommonSemanticTestBase` which implements `TableTestProgramRunner`. Prefer these over ITCase for operators and ExecNodes. +- **Restore tests** (for stateful operators): Use `RestoreTestBase` or `BatchRestoreTestBase` in `plan/nodes/exec/testutils/`. Implements `TableTestProgramRunner`, uses `@ExtendWith(MiniClusterExtension.class)`. Required when your operator uses state. Tests savepoint creation and job restart in two phases: (1) generate compiled plans + savepoints, (2) verify recovery. +- **Plan tests** (for optimization rules): Verify the generated execution plan using XML golden files. Used for logical and physical optimization rules. +- **ITCase** (for built-in functions): Function tests typically use ITCase with `TestSetSpec` for end-to-end verification (e.g., `JsonFunctionsITCase`, `TimeFunctionsITCase`). +- **JSON plan test regeneration:** Set `PLAN_TEST_FORCE_OVERWRITE=true` environment variable (documented in [README.md](README.md)). diff --git a/flink-table/flink-table-runtime/AGENTS.md b/flink-table/flink-table-runtime/AGENTS.md new file mode 100644 index 0000000000000..cd61620f20cd1 --- /dev/null +++ b/flink-table/flink-table-runtime/AGENTS.md @@ -0,0 +1,78 @@ + + +# flink-table-runtime + +Contains classes required by TaskManagers for execution of table programs. Implements runtime operators, built-in functions, and code generation support. Bundles janino (Java compiler for code generation) and flink-shaded-jsonpath. + +## Key Directory Structure + +- `functions/scalar/` — Scalar function implementations (47+) +- `functions/aggregate/` — Aggregate function implementations +- `functions/table/` — Table function implementations +- `functions/ptf/` — Process table function implementations +- `functions/ml/` — Machine learning function implementations +- `operators/` — Runtime operators organized by type: + - `join/` (hash, sort-merge, lookup, temporal, interval, delta, adaptive, stream/multi-join) + - `aggregate/` (group, window) + - `window/` (TVF windows, group windows) + - `deduplicate/`, `rank/`, `sort/` + - `sink/`, `source/` + - `correlate/` (including `async/` for async table functions) + - `calc/`, `match/`, `over/`, `process/`, `ml/`, `search/` + +## Common Change Patterns + +### Adding a built-in function + +Base classes by function type: + +- **Scalar:** Extend `BuiltInScalarFunction` in `functions/scalar/` +- **Table:** Extend `BuiltInTableFunction` in `functions/table/` +- **Aggregate:** Extend `BuiltInAggregateFunction` in `functions/aggregate/` +- **Process Table Function:** Extend `BuiltInProcessTableFunction` in `functions/ptf/` + +All are constructed from `BuiltInFunctionDefinition#specialize(SpecializedContext)` and work on internal data structures by default. + +Some functions also require custom code generators in the planner (e.g., `JsonCallGen.scala` for JSON functions). Simple scalar functions typically don't need planner changes; the planner handles them uniformly through the function definition. + +### Adding a runtime operator + +- **1 or 2 inputs:** Extend `TableStreamOperator` (which extends `AbstractStreamOperator`) and implement `OneInputStreamOperator` or `TwoInputStreamOperator` +- **3+ inputs:** Extend `AbstractStreamOperatorV2` and implement `MultipleInputStreamOperator` (see `StreamingMultiJoinOperator`) +- `TableStreamOperator` provides watermark tracking (`currentWatermark`), memory size computation, and a `ContextImpl` for timer services + +### Async operators and runners + +- **Key-ordered async execution:** `operators/join/lookup/keyordered/` contains async execution controller infrastructure (`AecRecord`, `Epoch`, `EpochManager`, `KeyAccountingUnit`, `RecordsBuffer`) for ordering guarantees in async lookup joins +- **Async correlate:** `operators/correlate/async/` for async table function support +- **Runner abstraction:** `AbstractFunctionRunner` and `AbstractAsyncFunctionRunner` provide base classes for code-generated function invocations (used by lookup join, ML predict, vector search runners) + +### State serializer migrations + +- When modifying state serializers, create a `TypeSerializerSnapshot` with version bumping +- Migration test resources follow naming: `migration-flink----snapshot` +- Rescaling tests verify state redistribution across parallelism changes (see `SinkUpsertMaterializerMigrationTest`, `SinkUpsertMaterializerRescalingTest`) + +## Testing Patterns + +- **Harness tests:** Use `OneInputStreamOperatorTestHarness` with `RowDataHarnessAssertor` for output validation. See `operators/join/LookupJoinHarnessTest.java` as a reference. +- **Test utilities:** `StreamRecordUtils.insertRecord()` for test records, `RowDataHarnessAssertor` for assertions +- **Operator test base classes:** Module has dedicated base classes per operator type (e.g., `TemporalTimeJoinOperatorTestBase`, `Int2HashJoinOperatorTestBase`, `WindowAggOperatorTestBase`) +- **State migration tests:** Use snapshot files per Flink version and state backend type to verify forward/backward compatibility