diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index a9b7ca100d05b..512a62fdae7ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -341,6 +341,7 @@ public List getSubStatements() { final LoadTsFileStatement statement = new LoadTsFileStatement(); statement.databaseLevel = this.databaseLevel; + statement.database = this.database; statement.verifySchema = this.verifySchema; statement.deleteAfterLoad = this.deleteAfterLoad; statement.convertOnTypeMismatch = this.convertOnTypeMismatch; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index 16c78811d0876..a39e212892d7f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -132,6 +132,34 @@ public LoadTsFileManager() { activeLoadAgent.start(); } + private long getCleanupTaskDelayInMs() { + return CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000L; + } + + private void createCleanupTaskIfAbsent(final String uuid) { + synchronized (uuid2CleanupTask) { + if (uuid2CleanupTask.containsKey(uuid)) { + return; + } + + final CleanupTask cleanupTask = new CleanupTask(uuid, getCleanupTaskDelayInMs()); + uuid2CleanupTask.put(uuid, cleanupTask); + cleanupTaskQueue.add(cleanupTask); + } + } + + private void rescheduleCleanupTask(final CleanupTask cleanupTask) { + synchronized (uuid2CleanupTask) { + if (uuid2CleanupTask.get(cleanupTask.uuid) != cleanupTask) { + return; + } + + cleanupTaskQueue.remove(cleanupTask); + cleanupTask.resetScheduledTime(); + cleanupTaskQueue.add(cleanupTask); + } + } + private void registerCleanupTaskExecutor() { PipeDataNodeAgent.runtime() .registerPeriodicalJob( @@ -203,26 +231,13 @@ private void recover() { uuid2WriterManager.put(uuid, writerManager); writerManager.close(); - synchronized (uuid2CleanupTask) { - final CleanupTask cleanupTask = - new CleanupTask( - uuid, CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000); - uuid2CleanupTask.put(uuid, cleanupTask); - cleanupTaskQueue.add(cleanupTask); - } + createCleanupTaskIfAbsent(uuid); }); } public void writeToDataRegion(DataRegion dataRegion, LoadTsFilePieceNode pieceNode, String uuid) throws IOException, PageException { - if (!uuid2WriterManager.containsKey(uuid)) { - synchronized (uuid2CleanupTask) { - final CleanupTask cleanupTask = - new CleanupTask(uuid, CONFIG.getLoadCleanupTaskExecutionDelayTimeSeconds() * 1000); - uuid2CleanupTask.put(uuid, cleanupTask); - cleanupTaskQueue.add(cleanupTask); - } - } + createCleanupTaskIfAbsent(uuid); final Optional cleanupTask = Optional.ofNullable(uuid2CleanupTask.get(uuid)); cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning); @@ -304,6 +319,8 @@ public boolean loadAll( return false; } + createCleanupTaskIfAbsent(uuid); + final Optional cleanupTask = Optional.ofNullable(uuid2CleanupTask.get(uuid)); cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning); try { @@ -326,9 +343,10 @@ public boolean deleteAll(String uuid) { private void clean(String uuid) { synchronized (uuid2CleanupTask) { - final CleanupTask cleanupTask = uuid2CleanupTask.get(uuid); + final CleanupTask cleanupTask = uuid2CleanupTask.remove(uuid); if (cleanupTask != null) { cleanupTask.cancel(); + cleanupTaskQueue.remove(cleanupTask); } } @@ -776,12 +794,12 @@ private CleanupTask(String uuid, long delayInMs) { public void markLoadTaskRunning() { isLoadTaskRunning = true; - resetScheduledTime(); + rescheduleCleanupTask(this); } public void markLoadTaskNotRunning() { isLoadTaskRunning = false; - resetScheduledTime(); + rescheduleCleanupTask(this); } public void resetScheduledTime() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java index e08a135d31271..12dffa48e82f3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java @@ -179,7 +179,18 @@ public synchronized LoadTsFileDataCacheMemoryBlock allocateDataCacheMemoryBlock( if (dataCacheMemoryBlock == null) { final long actuallyAllocateMemoryInBytes = tryAllocateFromQuery(MEMORY_TOTAL_SIZE_FROM_QUERY_IN_BYTES >> 2); - dataCacheMemoryBlock = new LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes); + try { + dataCacheMemoryBlock = new LoadTsFileDataCacheMemoryBlock(actuallyAllocateMemoryInBytes); + } catch (RuntimeException e) { + if (actuallyAllocateMemoryInBytes > 0) { + try { + releaseToQuery(actuallyAllocateMemoryInBytes); + } catch (RuntimeException releaseException) { + e.addSuppressed(releaseException); + } + } + throw e; + } LOGGER.info( "Create Data Cache Memory Block {}, allocate memory {}", dataCacheMemoryBlock, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java new file mode 100644 index 0000000000000..941794bb07414 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatementTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.plan.statement.crud; + +import org.apache.iotdb.db.conf.IoTDBConfig; +import org.apache.iotdb.db.conf.IoTDBDescriptor; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Stream; + +public class LoadTsFileStatementTest { + + @Test + public void testSubStatementsKeepDatabase() throws Exception { + final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + final int originalBatchSize = config.getLoadTsFileSubStatementBatchSize(); + final Path tempDir = Files.createTempDirectory("load-tsfile-sub-statements"); + + try { + config.setLoadTsFileSubStatementBatchSize(1); + Files.createFile(tempDir.resolve("a.tsfile")); + Files.createFile(tempDir.resolve("b.tsfile")); + + final LoadTsFileStatement statement = new LoadTsFileStatement(tempDir.toString()); + statement.setDatabase("test_db"); + + final List subStatements = statement.getSubStatements(); + Assert.assertEquals(2, subStatements.size()); + subStatements.forEach( + subStatement -> Assert.assertEquals("test_db", subStatement.getDatabase())); + } finally { + config.setLoadTsFileSubStatementBatchSize(originalBatchSize); + deleteRecursively(tempDir); + } + } + + private static void deleteRecursively(final Path path) throws IOException { + if (path == null || !Files.exists(path)) { + return; + } + + try (final Stream pathStream = Files.walk(path)) { + pathStream + .sorted(Comparator.reverseOrder()) + .forEach( + currentPath -> { + try { + Files.deleteIfExists(currentPath); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java new file mode 100644 index 0000000000000..f3a1bf7e4116f --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManagerTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.memory; + +import org.apache.iotdb.db.exception.load.LoadRuntimeOutOfMemoryException; + +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicLong; + +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; + +public class LoadTsFileMemoryManagerTest { + + @Test + public void testAllocateDataCacheMemoryBlockDoesNotDoubleCountMemory() throws Exception { + final long allocatedMemoryInBytes = 2L * 1024 * 1024; + final LoadTsFileMemoryManager manager = spy(newMemoryManager()); + + doAnswer( + invocation -> { + setUsedMemorySize(manager, allocatedMemoryInBytes); + return allocatedMemoryInBytes; + }) + .when(manager) + .tryAllocateFromQuery(anyLong()); + + manager.allocateDataCacheMemoryBlock(); + + Assert.assertEquals(allocatedMemoryInBytes, manager.getUsedMemorySizeInBytes()); + Assert.assertNotNull(getDataCacheMemoryBlock(manager)); + } + + @Test + public void testAllocateDataCacheMemoryBlockRollsBackPartialAllocationOnFailure() + throws Exception { + final long allocatedMemoryInBytes = 512L; + final LoadTsFileMemoryManager manager = spy(newMemoryManager()); + + doAnswer( + invocation -> { + setUsedMemorySize(manager, allocatedMemoryInBytes); + return allocatedMemoryInBytes; + }) + .when(manager) + .tryAllocateFromQuery(anyLong()); + doAnswer( + invocation -> { + setUsedMemorySize(manager, 0L); + return null; + }) + .when(manager) + .releaseToQuery(anyLong()); + + try { + manager.allocateDataCacheMemoryBlock(); + Assert.fail("Expected LoadRuntimeOutOfMemoryException"); + } catch (LoadRuntimeOutOfMemoryException e) { + Assert.assertEquals(0L, manager.getUsedMemorySizeInBytes()); + Assert.assertNull(getDataCacheMemoryBlock(manager)); + } + } + + private static LoadTsFileMemoryManager newMemoryManager() throws Exception { + final Constructor constructor = + LoadTsFileMemoryManager.class.getDeclaredConstructor(); + constructor.setAccessible(true); + return constructor.newInstance(); + } + + private static void setUsedMemorySize( + final LoadTsFileMemoryManager manager, final long usedMemorySizeInBytes) throws Exception { + final Field field = LoadTsFileMemoryManager.class.getDeclaredField("usedMemorySizeInBytes"); + field.setAccessible(true); + ((AtomicLong) field.get(manager)).set(usedMemorySizeInBytes); + } + + private static LoadTsFileDataCacheMemoryBlock getDataCacheMemoryBlock( + final LoadTsFileMemoryManager manager) throws Exception { + final Field field = LoadTsFileMemoryManager.class.getDeclaredField("dataCacheMemoryBlock"); + field.setAccessible(true); + return (LoadTsFileDataCacheMemoryBlock) field.get(manager); + } +}