diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index f8f3e199e93aa..fe9513cc69096 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -711,7 +711,7 @@ temporal: Converts a timestamp string to a TIMESTAMP_LTZ. - string1: the timestamp string to parse - - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's DateTimeFormatter syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds). + - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's [DateTimeFormatter](https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/time/format/DateTimeFormatter.java) syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds). - string3: the time zone of the input string (default 'UTC'). Supports zone IDs such as 'UTC', 'Asia/Shanghai', or 'America/Los_Angeles'. The output precision is inferred from the number of 'S' characters in the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP_LTZ(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP_LTZ(6). @@ -722,7 +722,19 @@ temporal: TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai') parses in Shanghai time zone. - sql: TO_TIMESTAMP(string1[, string2]) table: toTimestamp(STRING1[, STRING2]) - description: "Converts date time string string1 with format string2 (by default: 'yyyy-MM-dd HH:mm:ss') to a timestamp, without time zone." + description: | + Converts a datetime string to a TIMESTAMP without time zone. + + - string1: the datetime string to parse. Returns NULL if the string cannot be parsed. + - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's [DateTimeFormatter](https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/time/format/DateTimeFormatter.java) syntax, where 'S' represents fractional seconds (e.g., 'SSSSS' for 5-digit fractional seconds, 'SSSSSSS' for 7-digit, 'SSSSSSSSS' for nanoseconds). + + The output type is precision-aware (TIMESTAMP(3) through TIMESTAMP(9)): + - 1-arg variant: always returns TIMESTAMP(3). + - 2-arg variant: precision is inferred from the number of trailing 'S' characters in the format pattern (0-9), with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP(3), format 'yyyy-MM-dd HH:mm:ss.SSSSS' returns TIMESTAMP(5), format 'yyyy-MM-dd HH:mm:ss.SSSSSSS' returns TIMESTAMP(7). + + E.g., TO_TIMESTAMP('2023-01-01 00:00:00') parses using default format and returns TIMESTAMP(3), + TO_TIMESTAMP('2023-01-01 12:30:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS') returns TIMESTAMP(5), + TO_TIMESTAMP('2023-01-01 12:30:00.1234567', 'yyyy-MM-dd HH:mm:ss.SSSSSSS') returns TIMESTAMP(7). - sql: CURRENT_WATERMARK(rowtime) description: | Returns the current watermark for the given rowtime attribute, or `NULL` if no common watermark of all upstream operations is available at the current operation in the pipeline. diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index 35a546f6d2e4f..6fbc780d78bc2 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -837,7 +837,7 @@ temporal: Converts a timestamp string to a TIMESTAMP_LTZ. - string1: the timestamp string to parse - - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's DateTimeFormatter syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds). + - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's [DateTimeFormatter](https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/time/format/DateTimeFormatter.java) syntax, where 'S' represents fractional seconds (e.g., 'SSS' for milliseconds, 'SSSSSSSSS' for nanoseconds). - string3: the time zone of the input string (default 'UTC'). Supports zone IDs such as 'UTC', 'Asia/Shanghai', or 'America/Los_Angeles'. The output precision is inferred from the number of 'S' characters in the format pattern, with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP_LTZ(3), format 'yyyy-MM-dd HH:mm:ss.SSSSSS' returns TIMESTAMP_LTZ(6). @@ -848,7 +848,19 @@ temporal: TO_TIMESTAMP_LTZ('2023-01-01 00:00:00', 'yyyy-MM-dd HH:mm:ss', 'Asia/Shanghai') parses in Shanghai time zone. - sql: TO_TIMESTAMP(string1[, string2]) table: toTimestamp(STRING1[, STRING2]) - description: 将格式为 string2(默认为:'yyyy-MM-dd HH:mm:ss')的字符串 string1 转换为 timestamp,不带时区。 + description: | + Converts a datetime string to a TIMESTAMP without time zone. + + - string1: the datetime string to parse. Returns NULL if the string cannot be parsed. + - string2: the format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows Java's [DateTimeFormatter](https://github.com/openjdk/jdk/blob/jdk-11%2B28/src/java.base/share/classes/java/time/format/DateTimeFormatter.java) syntax, where 'S' represents fractional seconds (e.g., 'SSSSS' for 5-digit fractional seconds, 'SSSSSSS' for 7-digit, 'SSSSSSSSS' for nanoseconds). + + The output type is precision-aware (TIMESTAMP(3) through TIMESTAMP(9)): + - 1-arg variant: always returns TIMESTAMP(3). + - 2-arg variant: precision is inferred from the number of trailing 'S' characters in the format pattern (0-9), with a minimum of 3. E.g., format 'yyyy-MM-dd HH:mm:ss.SS' returns TIMESTAMP(3), format 'yyyy-MM-dd HH:mm:ss.SSSSS' returns TIMESTAMP(5), format 'yyyy-MM-dd HH:mm:ss.SSSSSSS' returns TIMESTAMP(7). + + E.g., TO_TIMESTAMP('2023-01-01 00:00:00') parses using default format and returns TIMESTAMP(3), + TO_TIMESTAMP('2023-01-01 12:30:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS') returns TIMESTAMP(5), + TO_TIMESTAMP('2023-01-01 12:30:00.1234567', 'yyyy-MM-dd HH:mm:ss.SSSSSSS') returns TIMESTAMP(7). - sql: CURRENT_WATERMARK(rowtime) description: | 返回给定时间列属性 rowtime 的当前水印,如果管道中的当前操作没有可用的上游操作的公共水印时则为 `NULL`。 diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 7ed3ff77f6e7a..7f5ac7e9327c1 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1600,7 +1600,10 @@ def to_time(self) -> 'Expression': def to_timestamp(self) -> 'Expression': """ Parses a timestamp string in the form "yyyy-MM-dd HH:mm:ss[.SSS]" to a SQL Timestamp. - It's equivalent to `col.cast(DataTypes.TIMESTAMP(3))`. + Returns TIMESTAMP(3). + + For precision-aware parsing, use :func:`~pyflink.table.expressions.to_timestamp` + with a format pattern instead. Example: :: diff --git a/flink-python/pyflink/table/expressions.py b/flink-python/pyflink/table/expressions.py index daf73ae0c216c..6ecc219421344 100644 --- a/flink-python/pyflink/table/expressions.py +++ b/flink-python/pyflink/table/expressions.py @@ -309,12 +309,22 @@ def to_date(date_str: Union[str, Expression[str]], def to_timestamp(timestamp_str: Union[str, Expression[str]], format: Union[str, Expression[str]] = None) -> Expression: """ - Converts the date time string with the given format (by default: 'yyyy-MM-dd HH:mm:ss') - under the 'UTC+0' time zone to a timestamp. - - :param timestamp_str: The date time string - :param format: The format of the string - :return: The date value with TIMESTAMP type. + Converts a datetime string to a TIMESTAMP without time zone. + + The output type is precision-aware (TIMESTAMP(3) through TIMESTAMP(9)): + + - 1-arg variant: always returns TIMESTAMP(3). + - 2-arg variant: precision is inferred from the number of trailing 'S' characters + in the format pattern (0-9), with a minimum of 3. E.g., format + 'yyyy-MM-dd HH:mm:ss.SSSSS' returns TIMESTAMP(5), format + 'yyyy-MM-dd HH:mm:ss.SSSSSSS' returns TIMESTAMP(7). + + :param timestamp_str: The datetime string to parse. Returns NULL if the string cannot be parsed. + :param format: The format pattern (default 'yyyy-MM-dd HH:mm:ss'). The pattern follows + Java's `DateTimeFormatter + `_ + syntax, where 'S' represents fractional seconds. + :return: The timestamp value with TIMESTAMP type. """ if format is None: return _unary_op("toTimestamp", timestamp_str) diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index 336284374539e..2040ef743d4e5 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -18,15 +18,23 @@ import unittest from pyflink.table import DataTypes -from pyflink.table.expression import TimeIntervalUnit, TimePointUnit, JsonExistsOnError, \ +from pyflink.table.expression import TimeIntervalUnit, TimePointUnit, \ + JsonExistsOnError, \ JsonValueOnEmptyOrError, JsonType, JsonQueryWrapper, JsonQueryOnEmptyOrError -from pyflink.table.expressions import (col, lit, range_, and_, or_, current_date, - current_time, current_timestamp, current_database, - local_timestamp, local_time, temporal_overlaps, date_format, - timestamp_diff, array, row, map_, row_interval, pi, e, - rand, rand_integer, atan2, negative, concat, concat_ws, uuid, - null_of, log, if_then_else, with_columns, call, - to_timestamp_ltz, from_unixtime, to_date, to_timestamp, +from pyflink.table.expressions import (col, lit, range_, and_, or_, + current_date, + current_time, current_timestamp, + current_database, + local_timestamp, local_time, + temporal_overlaps, date_format, + timestamp_diff, array, row, map_, + row_interval, pi, e, + rand, rand_integer, atan2, negative, + concat, concat_ws, uuid, + null_of, log, if_then_else, with_columns, + call, + to_timestamp_ltz, from_unixtime, to_date, + to_timestamp, convert_tz, unix_timestamp) from pyflink.testing.test_case_utils import PyFlinkTestCase @@ -301,9 +309,9 @@ def test_expressions(self): str(to_timestamp_ltz(expr1, "MM/dd/yyyy HH:mm:ss"))) self.assertEqual("TO_TIMESTAMP_LTZ(a, 'MM/dd/yyyy HH:mm:ss', 'UTC')", str(to_timestamp_ltz(expr1, "MM/dd/yyyy HH:mm:ss", "UTC"))) - self.assertEqual("toTimestamp('1970-01-01 08:01:40')", + self.assertEqual("TO_TIMESTAMP('1970-01-01 08:01:40')", str(to_timestamp('1970-01-01 08:01:40'))) - self.assertEqual("toTimestamp('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')", + self.assertEqual("TO_TIMESTAMP('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')", str(to_timestamp('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss'))) self.assertEqual("temporalOverlaps(cast('2:55:00', TIME(0)), 3600000, " "cast('3:30:00', TIME(0)), 7200000)", diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 6685be8fdbdbc..450d08ad433fe 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -2425,16 +2425,12 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL) public static final BuiltInFunctionDefinition TO_TIMESTAMP = BuiltInFunctionDefinition.newBuilder() - .name("toTimestamp") - .sqlName("TO_TIMESTAMP") + .name("TO_TIMESTAMP") .kind(SCALAR) - .inputTypeStrategy( - or( - sequence(logical(LogicalTypeFamily.CHARACTER_STRING)), - sequence( - logical(LogicalTypeFamily.CHARACTER_STRING), - logical(LogicalTypeFamily.CHARACTER_STRING)))) - .outputTypeStrategy(nullableIfArgs(explicit(TIMESTAMP(3)))) + .inputTypeStrategy(SpecificInputTypeStrategies.TO_TIMESTAMP) + .outputTypeStrategy(SpecificTypeStrategies.TO_TIMESTAMP) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.ToTimestampFunction") .build(); // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java index 9206985aa134a..9c3055f475632 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificInputTypeStrategies.java @@ -183,6 +183,9 @@ public static ArgumentTypeStrategy percentageArray(boolean expectedNullability) */ public static final InputTypeStrategy LEAD_LAG = new LeadLagInputTypeStrategy(); + /** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP}. */ + public static final InputTypeStrategy TO_TIMESTAMP = new ToTimestampInputTypeStrategy(); + /** Type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. */ public static final InputTypeStrategy TO_TIMESTAMP_LTZ = new ToTimestampLtzInputTypeStrategy(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java index 79e49de78cc4e..924446813b139 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/SpecificTypeStrategies.java @@ -121,6 +121,9 @@ public final class SpecificTypeStrategies { public static final TypeStrategy INTERNAL_REPLICATE_ROWS = new InternalReplicateRowsTypeStrategy(); + /** See {@link ToTimestampTypeStrategy}. */ + public static final TypeStrategy TO_TIMESTAMP = new ToTimestampTypeStrategy(); + /** See {@link ToTimestampLtzTypeStrategy}. */ public static final TypeStrategy TO_TIMESTAMP_LTZ = new ToTimestampLtzTypeStrategy(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampInputTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampInputTypeStrategy.java new file mode 100644 index 0000000000000..c047d90d60b69 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampInputTypeStrategy.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.ArgumentCount; +import org.apache.flink.table.types.inference.ArgumentTypeStrategy; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.ConstantArgumentCount; +import org.apache.flink.table.types.inference.InputTypeStrategy; +import org.apache.flink.table.types.inference.Signature; +import org.apache.flink.table.types.inference.Signature.Argument; +import org.apache.flink.table.types.logical.LogicalTypeFamily; + +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.table.types.inference.InputTypeStrategies.logical; + +/** + * Input type strategy for {@link BuiltInFunctionDefinitions#TO_TIMESTAMP} that validates the format + * pattern at compile time when provided as a literal. + */ +@Internal +public class ToTimestampInputTypeStrategy implements InputTypeStrategy { + + private static final ArgumentTypeStrategy CHARACTER_STRING_FAMILY_ARG = + logical(LogicalTypeFamily.CHARACTER_STRING); + + @Override + public ArgumentCount getArgumentCount() { + return ConstantArgumentCount.between(1, 2); + } + + @Override + public Optional> inferInputTypes( + CallContext callContext, boolean throwOnFailure) { + final List result = new ArrayList<>(); + final int numberOfArguments = callContext.getArgumentDataTypes().size(); + + final Optional timestampArg = + CHARACTER_STRING_FAMILY_ARG.inferArgumentType(callContext, 0, throwOnFailure); + if (timestampArg.isEmpty()) { + return Optional.empty(); + } + result.add(timestampArg.get()); + + if (numberOfArguments > 1) { + final Optional patternArg = + validatePatternArgument(callContext, throwOnFailure); + if (patternArg.isEmpty()) { + return Optional.empty(); + } + result.add(patternArg.get()); + } + + return Optional.of(result); + } + + private Optional validatePatternArgument( + final CallContext callContext, final boolean throwOnFailure) { + final Optional patternArg = + CHARACTER_STRING_FAMILY_ARG.inferArgumentType(callContext, 1, throwOnFailure); + if (patternArg.isEmpty()) { + return Optional.empty(); + } + + if (callContext.isArgumentLiteral(1)) { + final Optional patternOpt = callContext.getArgumentValue(1, String.class); + if (patternOpt.isEmpty()) { + return callContext.fail(throwOnFailure, "Pattern can not be a null literal"); + } + try { + DateTimeFormatter.ofPattern(patternOpt.get()); + } catch (IllegalArgumentException e) { + return callContext.fail( + throwOnFailure, + "Invalid pattern for parsing TIMESTAMP: %s", + e.getMessage()); + } + } + + return patternArg; + } + + @Override + public List getExpectedSignatures(FunctionDefinition definition) { + return List.of( + Signature.of(Argument.ofGroup(LogicalTypeFamily.CHARACTER_STRING)), + Signature.of( + Argument.ofGroup(LogicalTypeFamily.CHARACTER_STRING), + Argument.ofGroup("pattern", LogicalTypeFamily.CHARACTER_STRING))); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampTypeStrategy.java new file mode 100644 index 0000000000000..af69dc9d87951 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToTimestampTypeStrategy.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.types.inference.TypeStrategy; +import org.apache.flink.table.utils.DateTimeUtils; + +import java.util.Optional; + +/** + * Type strategy of {@code TO_TIMESTAMP}. Returns {@code TIMESTAMP(3)} for the 1-arg variant and + * infers precision from the format pattern's trailing 'S' count for the 2-arg variant. + */ +@Internal +public class ToTimestampTypeStrategy implements TypeStrategy { + + private static final int DEFAULT_PRECISION = 3; + + @Override + public Optional inferType(CallContext callContext) { + int outputPrecision = DEFAULT_PRECISION; + + if (callContext.getArgumentDataTypes().size() == 2) { + outputPrecision = inferPrecisionFromFormat(callContext); + } + + return Optional.of(DataTypes.TIMESTAMP(outputPrecision).nullable()); + } + + /** + * Infers the output precision from a format string literal. Returns at least {@link + * #DEFAULT_PRECISION}. + */ + private static int inferPrecisionFromFormat(CallContext callContext) { + if (!callContext.isArgumentLiteral(1)) { + return DEFAULT_PRECISION; + } + return callContext + .getArgumentValue(1, String.class) + .map(DateTimeUtils::precisionFromFormat) + .orElse(DEFAULT_PRECISION); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java index 754e694c53a45..fe6cbd4d29056 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java @@ -425,9 +425,9 @@ public static TimestampData parseTimestampData(String dateStr, int precision, Ti /** * Parses a timestamp string with the given format, truncating to millisecond precision. - * Precision is hardcoded to match signature of TO_TIMESTAMP. * - * @see FLINK-14925 + *

Note: For precision-aware parsing, use {@link #parseTimestampData(String, String, int)} + * with {@link #precisionFromFormat(String)} instead. */ public static TimestampData parseTimestampData(String dateStr, String format) { return parseTimestampData(dateStr, format, 3); diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampTypeStrategyTest.java new file mode 100644 index 0000000000000..192f89ebcaa6a --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToTimestampTypeStrategyTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.inference.TypeStrategiesTestBase; + +import java.util.stream.Stream; + +/** Tests for {@link ToTimestampTypeStrategy}. */ +class ToTimestampTypeStrategyTest extends TypeStrategiesTestBase { + + @Override + protected Stream testData() { + return Stream.of( + // 1-arg: always TIMESTAMP(3) + TestSpec.forStrategy( + "TO_TIMESTAMP() returns TIMESTAMP(3)", + SpecificTypeStrategies.TO_TIMESTAMP) + .inputTypes(DataTypes.STRING()) + .expectDataType(DataTypes.TIMESTAMP(3).nullable()), + // 2-arg: non-literal format defaults to TIMESTAMP(3) + TestSpec.forStrategy( + "TO_TIMESTAMP(, ) defaults to TIMESTAMP(3)", + SpecificTypeStrategies.TO_TIMESTAMP) + .inputTypes(DataTypes.STRING(), DataTypes.STRING()) + .expectDataType(DataTypes.TIMESTAMP(3).nullable()), + // Format-based precision: SS → TIMESTAMP(3) + TestSpec.forStrategy( + "Format with SS returns TIMESTAMP(3)", + SpecificTypeStrategies.TO_TIMESTAMP) + .inputTypes(DataTypes.STRING(), DataTypes.STRING()) + .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SS") + .expectDataType(DataTypes.TIMESTAMP(3).nullable()), + + // Format-based precision: SSS → TIMESTAMP(3) + TestSpec.forStrategy( + "Format with SSS returns TIMESTAMP(3)", + SpecificTypeStrategies.TO_TIMESTAMP) + .inputTypes(DataTypes.STRING(), DataTypes.STRING()) + .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSS") + .expectDataType(DataTypes.TIMESTAMP(3).nullable()), + // Format-based precision: SSSSSS → TIMESTAMP(6) + TestSpec.forStrategy( + "Format with SSSSSS returns TIMESTAMP(6)", + SpecificTypeStrategies.TO_TIMESTAMP) + .inputTypes(DataTypes.STRING(), DataTypes.STRING()) + .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSSSSS") + .expectDataType(DataTypes.TIMESTAMP(6).nullable()), + // Format-based precision: SSSSSSS → TIMESTAMP(7) + TestSpec.forStrategy( + "Format with SSSSSSS returns TIMESTAMP(7)", + SpecificTypeStrategies.TO_TIMESTAMP) + .inputTypes(DataTypes.STRING(), DataTypes.STRING()) + .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSSSSSS") + .expectDataType(DataTypes.TIMESTAMP(7).nullable()), + // Format-based precision: SSSSSSSSS → TIMESTAMP(9) + TestSpec.forStrategy( + "Format with SSSSSSSSS returns TIMESTAMP(9)", + SpecificTypeStrategies.TO_TIMESTAMP) + .inputTypes(DataTypes.STRING(), DataTypes.STRING()) + .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss.SSSSSSSSS") + .expectDataType(DataTypes.TIMESTAMP(9).nullable()), + // Format without S → TIMESTAMP(3) + TestSpec.forStrategy( + "Format without S returns TIMESTAMP(3)", + SpecificTypeStrategies.TO_TIMESTAMP) + .inputTypes(DataTypes.STRING(), DataTypes.STRING()) + .calledWithLiteralAt(1, "yyyy-MM-dd HH:mm:ss") + .expectDataType(DataTypes.TIMESTAMP(3).nullable())); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java index 007bcb2a8de4b..c8df8f06cae4e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java @@ -285,8 +285,6 @@ void initNonDynamicFunctions() { BuiltInFunctionDefinitions.UNIX_TIMESTAMP, FlinkSqlOperatorTable.UNIX_TIMESTAMP); definitionSqlOperatorHashMap.put( BuiltInFunctionDefinitions.TO_DATE, FlinkSqlOperatorTable.TO_DATE); - definitionSqlOperatorHashMap.put( - BuiltInFunctionDefinitions.TO_TIMESTAMP, FlinkSqlOperatorTable.TO_TIMESTAMP); // catalog functions definitionSqlOperatorHashMap.put( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java index 817284846d11d..55d6dbd03fae1 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java @@ -819,22 +819,6 @@ public SqlSyntax getSyntax() { OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER), SqlFunctionCategory.STRING); - // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9), - // but we keep TIMESTAMP(3) now because we did not support TIMESTAMP(9) as time attribute. - // See: https://issues.apache.org/jira/browse/FLINK-14925 - public static final SqlFunction TO_TIMESTAMP = - new SqlFunction( - "TO_TIMESTAMP", - SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade( - ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3), - SqlTypeTransforms.FORCE_NULLABLE), - null, - OperandTypes.or( - OperandTypes.family(SqlTypeFamily.CHARACTER), - OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), - SqlFunctionCategory.TIMEDATE); - public static final SqlFunction TO_DATE = new SqlFunction( "TO_DATE", diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala index a6392ae4a4d0a..32c541b7c1609 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala @@ -325,15 +325,6 @@ object BuiltInMethods { classOf[Long], classOf[TimeZone]) - val STRING_TO_TIMESTAMP = - Types.lookupMethod(classOf[DateTimeUtils], "parseTimestampData", classOf[String]) - - val STRING_TO_TIMESTAMP_WITH_FORMAT = Types.lookupMethod( - classOf[DateTimeUtils], - "parseTimestampData", - classOf[String], - classOf[String]) - val TIMESTAMP_WITH_LOCAL_TIME_ZONE_TO_DATE = Types.lookupMethod( classOf[DateTimeUtils], "timestampWithLocalZoneToDate", diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala index 71ef476424a8c..3ebb669deaed4 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala @@ -191,15 +191,6 @@ object StringCallGen { isCharacterString(operands(1).resultType) => methodGen(BuiltInMethods.STRING_TO_DATE_WITH_FORMAT) - case TO_TIMESTAMP if operands.size == 1 && isCharacterString(operands.head.resultType) => - fallibleMethodGen(BuiltInMethods.STRING_TO_TIMESTAMP) - - case TO_TIMESTAMP - if operands.size == 2 && - isCharacterString(operands.head.resultType) && - isCharacterString(operands(1).resultType) => - fallibleMethodGen(BuiltInMethods.STRING_TO_TIMESTAMP_WITH_FORMAT) - case UNIX_TIMESTAMP if operands.size == 1 && isCharacterString(operands.head.resultType) => methodGen(BuiltInMethods.UNIX_TIMESTAMP_STR) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java index d606dc15313cc..0df4602da75a8 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java @@ -52,6 +52,7 @@ import static org.apache.flink.table.api.Expressions.call; import static org.apache.flink.table.api.Expressions.lit; import static org.apache.flink.table.api.Expressions.temporalOverlaps; +import static org.apache.flink.table.api.Expressions.toTimestamp; import static org.apache.flink.table.api.Expressions.toTimestampLtz; import static org.apache.flink.table.planner.expressions.ExpressionBuilder.literal; @@ -68,6 +69,7 @@ Stream getTestSetSpecs() { temporalOverlapsTestCases(), ceilTestCases(), floorTestCases(), + toTimestampTestCases(), toTimestampLtzTestCases()) .flatMap(s -> s); } @@ -811,6 +813,92 @@ private Stream floorTestCases() { STRING().nullable())); } + private Stream toTimestampTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP) + .onFieldsWithData("unparsable", null) + .andDataTypes(STRING(), STRING().nullable()) + // 1-arg: default format, returns TIMESTAMP(3) + .testResult( + toTimestamp("2023-01-01 12:30:00"), + "TO_TIMESTAMP('2023-01-01 12:30:00')", + LocalDateTime.of(2023, 1, 1, 12, 30, 0), + TIMESTAMP(3).nullable()) + // 1-arg: truncates to precision 3 + .testResult( + toTimestamp("1970-01-01 00:00:00.123456789"), + "TO_TIMESTAMP('1970-01-01 00:00:00.123456789')", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123000000), + TIMESTAMP(3).nullable()) + // 2-arg: precision 6 from format + .testResult( + toTimestamp( + "2023-01-01 12:30:00.123456", "yyyy-MM-dd HH:mm:ss.SSSSSS"), + "TO_TIMESTAMP('2023-01-01 12:30:00.123456', 'yyyy-MM-dd HH:mm:ss.SSSSSS')", + LocalDateTime.of(2023, 1, 1, 12, 30, 0, 123456000), + TIMESTAMP(6).nullable()) + // 2-arg: precision 9 from format + .testResult( + toTimestamp( + "1970-01-01 00:00:00.123456789", + "yyyy-MM-dd HH:mm:ss.SSSSSSSSS"), + "TO_TIMESTAMP('1970-01-01 00:00:00.123456789', 'yyyy-MM-dd HH:mm:ss.SSSSSSSSS')", + LocalDateTime.of(1970, 1, 1, 0, 0, 0, 123456789), + TIMESTAMP(9).nullable()) + // 2-arg: precision 2 (clamped to minimum 3) + .testResult( + toTimestamp("2023-01-01 12:30:00.12", "yyyy-MM-dd HH:mm:ss.SS"), + "TO_TIMESTAMP('2023-01-01 12:30:00.12', 'yyyy-MM-dd HH:mm:ss.SS')", + LocalDateTime.of(2023, 1, 1, 12, 30, 0, 120000000), + TIMESTAMP(3).nullable()) + // 2-arg: precision 5 from format + .testResult( + toTimestamp( + "2023-01-01 12:30:00.12345", "yyyy-MM-dd HH:mm:ss.SSSSS"), + "TO_TIMESTAMP('2023-01-01 12:30:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS')", + LocalDateTime.of(2023, 1, 1, 12, 30, 0, 123450000), + TIMESTAMP(5).nullable()) + // 2-arg: precision 7 from format + .testResult( + toTimestamp( + "2023-01-01 12:30:00.1234567", + "yyyy-MM-dd HH:mm:ss.SSSSSSS"), + "TO_TIMESTAMP('2023-01-01 12:30:00.1234567', 'yyyy-MM-dd HH:mm:ss.SSSSSSS')", + LocalDateTime.of(2023, 1, 1, 12, 30, 0, 123456700), + TIMESTAMP(7).nullable()) + // 2-arg: SSS format still returns TIMESTAMP(3) + .testResult( + toTimestamp("2017-09-15 00:00:00.12345", "yyyy-MM-dd HH:mm:ss.SSS"), + "TO_TIMESTAMP('2017-09-15 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSS')", + LocalDateTime.of(2017, 9, 15, 0, 0, 0, 123000000), + TIMESTAMP(3).nullable()) + // 2-arg: fewer input digits than format precision + .testResult( + toTimestamp("2023-01-01 00:00:00.1", "yyyy-MM-dd HH:mm:ss.SSSSSS"), + "TO_TIMESTAMP('2023-01-01 00:00:00.1', 'yyyy-MM-dd HH:mm:ss.SSSSSS')", + LocalDateTime.of(2023, 1, 1, 0, 0, 0, 100000000), + TIMESTAMP(6).nullable()) + // unparsable string returns null + .testResult( + toTimestamp($("f0")), + "TO_TIMESTAMP(f0)", + null, + TIMESTAMP(3).nullable()) + // null input returns null + .testResult( + toTimestamp($("f1")), + "TO_TIMESTAMP(f1)", + null, + TIMESTAMP(3).nullable()) + // invalid format pattern fails at validation + .testSqlValidationError( + "TO_TIMESTAMP('2023-01-01', 'un-parsable format')", + "Invalid pattern for parsing TIMESTAMP") + .testTableApiValidationError( + toTimestamp("2023-01-01", "un-parsable format"), + "Invalid pattern for parsing TIMESTAMP")); + } + private Stream toTimestampLtzTestCases() { return Stream.of( TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out index bc5c9365b8261..c7c8c3505dd5a 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeHopWindow.out @@ -31,7 +31,7 @@ } ], "type" : "TIMESTAMP(3)" }, - "serializableString" : "TO_TIMESTAMP(`c`)" + "serializableString" : "`TO_TIMESTAMP`(`c`)" } }, { "name" : "proctime", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out index 8020137092793..7f223bc2fff3c 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeSessionWindow.out @@ -31,7 +31,7 @@ } ], "type" : "TIMESTAMP(3)" }, - "serializableString" : "TO_TIMESTAMP(`c`)" + "serializableString" : "`TO_TIMESTAMP`(`c`)" } }, { "name" : "proctime", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out index fdb06305cb38c..ff371defe9bac 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testEventTimeTumbleWindow.out @@ -31,7 +31,7 @@ } ], "type" : "TIMESTAMP(3)" }, - "serializableString" : "TO_TIMESTAMP(`c`)" + "serializableString" : "`TO_TIMESTAMP`(`c`)" } }, { "name" : "proctime", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out index e0334458a780e..fb93b7d15bc00 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeHopWindow.out @@ -31,7 +31,7 @@ } ], "type" : "TIMESTAMP(3)" }, - "serializableString" : "TO_TIMESTAMP(`c`)" + "serializableString" : "`TO_TIMESTAMP`(`c`)" } }, { "name" : "proctime", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out index cd2e2b67f300c..255fb1ace1570 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeSessionWindow.out @@ -31,7 +31,7 @@ } ], "type" : "TIMESTAMP(3)" }, - "serializableString" : "TO_TIMESTAMP(`c`)" + "serializableString" : "`TO_TIMESTAMP`(`c`)" } }, { "name" : "proctime", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out index 32b9614edc985..93780a477bb2a 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/PythonGroupWindowAggregateJsonPlanTest_jsonplan/testProcTimeTumbleWindow.out @@ -31,7 +31,7 @@ } ], "type" : "TIMESTAMP(3)" }, - "serializableString" : "TO_TIMESTAMP(`c`)" + "serializableString" : "`TO_TIMESTAMP`(`c`)" } }, { "name" : "proctime", diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 225843946f194..111484c487a01 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -1922,7 +1922,7 @@ class TableEnvironmentTest { | a int not null, | b varchar, | c int, - | ts AS to_timestamp(b), + | ts AS TO_TIMESTAMP(b), | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND |) with ( | 'connector' = 'COLLECTION' @@ -2322,7 +2322,7 @@ class TableEnvironmentTest { | f29 AS LOCALTIMESTAMP, | f30 AS CURRENT_TIMESTAMP, | f31 AS CURRENT_ROW_TIMESTAMP(), - | ts AS to_timestamp(f25), + | ts AS TO_TIMESTAMP(f25), | PRIMARY KEY(f24, f26) NOT ENFORCED, | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND |) with ( @@ -2387,7 +2387,7 @@ class TableEnvironmentTest { "TIMESTAMP(3) *ROWTIME*", Boolean.box(true), null, - "AS TO_TIMESTAMP(`f25`)", + "AS `TO_TIMESTAMP`(`f25`)", "`ts` - INTERVAL '1' SECOND") ) val tableResult1 = tableEnv.executeSql("describe T1") @@ -2464,7 +2464,7 @@ class TableEnvironmentTest { | c29 AS LOCALTIMESTAMP, | c30 AS CURRENT_TIMESTAMP comment 'notice: computed column', | c31 AS CURRENT_ROW_TIMESTAMP(), - | ts AS to_timestamp(c25) comment 'notice: watermark', + | ts AS TO_TIMESTAMP(c25) comment 'notice: watermark', | PRIMARY KEY(c24, c26) NOT ENFORCED, | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND |) with ( @@ -2540,7 +2540,7 @@ class TableEnvironmentTest { "TIMESTAMP(3) *ROWTIME*", Boolean.box(true), null, - "AS TO_TIMESTAMP(`c25`)", + "AS `TO_TIMESTAMP`(`c25`)", "`ts` - INTERVAL '1' SECOND", "notice: watermark") ) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala index 15fb90172057c..0c1926b8389a8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala @@ -618,14 +618,6 @@ class TemporalTypesTest extends ExpressionTestBase { testSqlApi("TIME '12:44:31'", "12:44:31") testAllApis(toDate("2018-03-18", "yyyy-MM-dd"), "TO_DATE('2018-03-18')", "2018-03-18") - testAllApis( - toTimestamp("1970-01-01 08:01:40"), - "TO_TIMESTAMP('1970-01-01 08:01:40')", - "1970-01-01 08:01:40.000") - testAllApis( - toTimestamp("1970-01-01 08:01:40", "yyyy-MM-dd HH:mm:ss"), - "TO_TIMESTAMP('1970-01-01 08:01:40', 'yyyy-MM-dd HH:mm:ss')", - "1970-01-01 08:01:40.000") // EXTRACT // testSqlApi("TO_DATE(1521331200)", "2018-03-18") @@ -970,32 +962,6 @@ class TemporalTypesTest extends ExpressionTestBase { testSqlApi("UNIX_TIMESTAMP('2015/07/24 10:00:00.5', 'yyyy/MM/dd HH:mm:ss.S')", "1437699600") } - /** - * now Flink only support TIMESTAMP(3) as the return type in TO_TIMESTAMP See: - * https://issues.apache.org/jira/browse/FLINK-14925 - */ - @Test - def testToTimeStampFunctionWithHighPrecision(): Unit = { - testSqlApi("TO_TIMESTAMP('1970-01-01 00:00:00.123456789')", "1970-01-01 00:00:00.123") - - testSqlApi( - "TO_TIMESTAMP('1970-01-01 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSSSS')", - "1970-01-01 00:00:00.123") - - testSqlApi( - "TO_TIMESTAMP('20000202 59:59.1234567', 'yyyyMMdd mm:ss.SSSSSSS')", - "2000-02-02 00:59:59.123") - - testSqlApi("TO_TIMESTAMP('1234567', 'SSSSSSS')", "1970-01-01 00:00:00.123") - - testSqlApi( - "TO_TIMESTAMP('2017-09-15 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSS')", - "2017-09-15 00:00:00.123") - testSqlApi( - "CAST(TO_TIMESTAMP('2017-09-15 00:00:00.12345', 'yyyy-MM-dd HH:mm:ss.SSS') AS STRING)", - "2017-09-15 00:00:00.123") - } - @Test def testHighPrecisionTimestamp(): Unit = { // EXTRACT should support millisecond/microsecond/nanosecond diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala index b5ae83c3d5663..51344753ddee8 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/TableScanTest.scala @@ -37,7 +37,7 @@ class TableScanTest extends TableTestBase { | a int, | b varchar, | c as a + 1, - | d as to_timestamp(b), + | d as TO_TIMESTAMP(b), | e as my_udf(a) |) with ( | 'connector' = 'values', @@ -50,7 +50,7 @@ class TableScanTest extends TableTestBase { | a int, | b varchar, | c as a + 1, - | d as to_timestamp(b), + | d as TO_TIMESTAMP(b), | e as my_udf(a), | WATERMARK FOR d AS d - INTERVAL '0.001' SECOND |) with ( @@ -160,7 +160,7 @@ class TableScanTest extends TableTestBase { | a int, | b varchar, | c as a + 1, - | d as to_timestamp(b), + | d as TO_TIMESTAMP(b), | e as my_udf(a), | ptime as proctime() |) with ( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala index 9d37d60bcd5f0..2283f7dc61fde 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/physical/stream/WatermarkAssignerChangelogNormalizeTransposeRuleTest.scala @@ -49,7 +49,7 @@ class WatermarkAssignerChangelogNormalizeTransposeRuleTest extends TableTestBase | currency_no STRING, | rate BIGINT, | c STRING, - | currency_time as to_timestamp(c), + | currency_time as TO_TIMESTAMP(c), | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, | PRIMARY KEY(currency) NOT ENFORCED |) WITH ( @@ -66,7 +66,7 @@ class WatermarkAssignerChangelogNormalizeTransposeRuleTest extends TableTestBase | currency_no STRING, | rate BIGINT, | c STRING, - | currency_time as to_timestamp(c), + | currency_time as TO_TIMESTAMP(c), | WATERMARK FOR currency_time AS currency_time - interval '5' SECOND, | PRIMARY KEY(currency) NOT ENFORCED |) WITH ( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala index ea905662a4221..330e693a0b6f5 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/TableScanTest.scala @@ -62,7 +62,7 @@ class TableScanTest extends TableTestBase { | a int, | b varchar, | c as a + 1, - | d as to_timestamp(b), + | d as TO_TIMESTAMP(b), | e as my_udf(a) |) with ( | 'connector' = 'values' @@ -155,7 +155,7 @@ class TableScanTest extends TableTestBase { | a int, | b varchar, | c as a + 1, - | d as to_timestamp(b), + | d as TO_TIMESTAMP(b), | e as my_udf(a), | WATERMARK FOR d AS d - INTERVAL '0.001' SECOND |) with ( @@ -426,7 +426,7 @@ class TableScanTest extends TableTestBase { | a INT, | b AS a + 1, | c STRING, - | ts as to_timestamp(c), + | ts as TO_TIMESTAMP(c), | PRIMARY KEY (id) NOT ENFORCED, | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND |) WITH ( @@ -465,7 +465,7 @@ class TableScanTest extends TableTestBase { | a INT, | b AS a + 1, | c STRING, - | ts as to_timestamp(c), + | ts as TO_TIMESTAMP(c), | PRIMARY KEY (id) NOT ENFORCED, | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND |) WITH ( @@ -485,7 +485,7 @@ class TableScanTest extends TableTestBase { | a INT, | b AS a + 1, | c STRING, - | ts as to_timestamp(c), + | ts as TO_TIMESTAMP(c), | PRIMARY KEY (id) NOT ENFORCED, | WATERMARK FOR ts AS ts - INTERVAL '1' SECOND |) WITH ( diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampFunction.java new file mode 100644 index 0000000000000..72f44994b8496 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampFunction.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP}. + * + *

Supported function signatures: + * + *

    + *
  • {@code TO_TIMESTAMP(string)} -> TIMESTAMP(3)
    + * Parses string timestamp using default format 'yyyy-MM-dd HH:mm:ss' + *
  • {@code TO_TIMESTAMP(string, format)} -> TIMESTAMP(precision)
    + * Parses string timestamp using the given format. Output precision is inferred from the + * format pattern's trailing 'S' count, with a minimum of 3 for backward compatibility. + *
+ */ +@Internal +public class ToTimestampFunction extends BuiltInScalarFunction { + + public ToTimestampFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.TO_TIMESTAMP, context); + } + + public @Nullable TimestampData eval(@Nullable StringData timestamp) { + if (timestamp == null) { + return null; + } + + try { + return parseTimestampData(timestamp.toString()); + } catch (DateTimeException e) { + return null; + } + } + + public @Nullable TimestampData eval( + @Nullable StringData timestamp, @Nullable StringData format) { + if (timestamp == null || format == null) { + return null; + } + + try { + String formatStr = format.toString(); + return parseTimestampData( + timestamp.toString(), formatStr, DateTimeUtils.precisionFromFormat(formatStr)); + } catch (DateTimeException e) { + return null; + } + } +}